You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/02 01:32:12 UTC

[5/5] mesos git commit: Added `getService()` function to launch CSI plugins.

Added `getService()` function to launch CSI plugins.

The `getService()` method first checks if there is already a container
daemon for the specified plugin component, and creates a new one if not.
The post-start hook for the container daemon will call `connect()` to
wait for the endpoint socket file to appear and connect to it, then
set up the corresponding promise of CSI client. The post-stop hook will
remove the socket file and create a new promise for the next call to the
post-start hook to set it up.

Review: https://reviews.apache.org/r/63021/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/434a0941
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/434a0941
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/434a0941

Branch: refs/heads/master
Commit: 434a09410b8ecf4c163ddba441db40cc1a2f711d
Parents: 619f4ae
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 1 15:11:39 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800

----------------------------------------------------------------------
 include/mesos/type_utils.hpp               |   5 +
 include/mesos/v1/mesos.hpp                 |   5 +
 src/common/type_utils.cpp                  |   8 +
 src/resource_provider/storage/provider.cpp | 317 +++++++++++++++++++++++-
 src/v1/mesos.cpp                           |   8 +
 5 files changed, 342 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index a348c7d..d28d538 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -429,6 +429,11 @@ std::ostream& operator<<(std::ostream& stream, const CheckInfo::Type& type);
 
 std::ostream& operator<<(
     std::ostream& stream,
+    const CSIPluginContainerInfo::Service& service);
+
+
+std::ostream& operator<<(
+    std::ostream& stream,
     const FrameworkInfo::Capability& capability);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index d1401fb..de61c36 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -423,6 +423,11 @@ std::ostream& operator<<(std::ostream& stream, const CheckInfo::Type& type);
 
 std::ostream& operator<<(
     std::ostream& stream,
+    const CSIPluginContainerInfo::Service& service);
+
+
+std::ostream& operator<<(
+    std::ostream& stream,
     const FrameworkInfo::Capability& capability);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 3657d55..a272221 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -743,6 +743,14 @@ ostream& operator<<(ostream& stream, const CheckInfo::Type& type)
 
 ostream& operator<<(
     ostream& stream,
+    const CSIPluginContainerInfo::Service& service)
+{
+  return stream << CSIPluginContainerInfo::Service_Name(service);
+}
+
+
+ostream& operator<<(
+    ostream& stream,
     const FrameworkInfo::Capability& capability)
 {
   return stream << FrameworkInfo::Capability::Type_Name(capability.type());

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index eee857b..bbf168a 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -16,48 +16,72 @@
 
 #include "resource_provider/storage/provider.hpp"
 
+#include <algorithm>
 #include <cctype>
 
 #include <glog/logging.h>
 
+#include <process/after.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
 #include <process/id.hpp>
+#include <process/loop.hpp>
 #include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <mesos/type_utils.hpp>
 
 #include <mesos/resource_provider/resource_provider.hpp>
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
 #include <stout/path.hpp>
 
+#include <stout/os/exists.hpp>
+#include <stout/os/mkdir.hpp>
 #include <stout/os/realpath.hpp>
+#include <stout/os/rm.hpp>
 
 #include "common/http.hpp"
 
+#include "csi/client.hpp"
+#include "csi/paths.hpp"
+#include "csi/utils.hpp"
+
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
 #include "resource_provider/detector.hpp"
 
+#include "slave/container_daemon.hpp"
 #include "slave/paths.hpp"
 
 namespace http = process::http;
 
+using std::find;
 using std::queue;
 using std::string;
 
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
 using process::Failure;
 using process::Future;
 using process::Owned;
 using process::Process;
+using process::Promise;
+using process::Timeout;
 
+using process::after;
 using process::defer;
+using process::loop;
 using process::spawn;
 
 using process::http::authentication::Principal;
 
-using mesos::ResourceProviderInfo;
+using mesos::internal::slave::ContainerDaemon;
 
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
@@ -101,6 +125,10 @@ static bool isValidType(const string& s)
 }
 
 
+// Timeout for a CSI plugin component to create its endpoint socket.
+static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Seconds(5);
+
+
 // Returns a prefix for naming standalone containers to run CSI plugins
 // for the resource provider. The prefix is of the following format:
 //     <rp_type>-<rp_name>--
@@ -117,6 +145,64 @@ static inline string getContainerIdPrefix(const ResourceProviderInfo& info)
 }
 
 
+// Returns the container ID of the standalone container to run a CSI
+// plugin component. The container ID is of the following format:
+//     <rp_type>-<rp_name>--<csi_type>-<csi_name>--<list_of_services>
+// where <rp_type> and <rp_name> are the type and name of the resource
+// provider, and <csi_type> and <csi_name> are the type and name of the
+// CSI plugin, with dots replaced by dashes. <list_of_services> lists
+// the CSI services provided by the component, concatenated with dashes.
+static inline ContainerID getContainerId(
+    const ResourceProviderInfo& info,
+    const CSIPluginContainerInfo& container)
+{
+  string value = getContainerIdPrefix(info);
+
+  value += strings::join(
+      "-",
+      strings::replace(info.storage().type(), ".", "-"),
+      info.storage().name(),
+      "");
+
+  for (int i = 0; i < container.services_size(); i++) {
+    value += "-" + stringify(container.services(i));
+  }
+
+  ContainerID containerId;
+  containerId.set_value(value);
+
+  return containerId;
+}
+
+
+static Option<CSIPluginContainerInfo> getCSIPluginContainerInfo(
+    const ResourceProviderInfo& info,
+    const ContainerID& containerId)
+{
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    if (getContainerId(info, container) == containerId) {
+      return container;
+    }
+  }
+
+  return None();
+}
+
+
+// Returns the parent endpoint as a URL.
+// TODO(jieyu): Consider using a more reliable way to get the agent v1
+// operator API endpoint URL.
+static inline http::URL extractParentEndpoint(const http::URL& url)
+{
+  http::URL parent = url;
+
+  parent.path = Path(url.path).dirname();
+
+  return parent;
+}
+
+
 class StorageLocalResourceProviderProcess
   : public Process<StorageLocalResourceProviderProcess>
 {
@@ -159,6 +245,9 @@ private:
   void operation(const Event::Operation& operation);
   void publish(const Event::Publish& publish);
 
+  Future<csi::Client> connect(const string& endpoint);
+  Future<csi::Client> getService(const ContainerID& containerId);
+
   enum State
   {
     RECOVERING,
@@ -176,7 +265,14 @@ private:
   const SlaveID slaveId;
   const Option<string> authToken;
 
+  csi::Version csiVersion;
+  process::grpc::client::Runtime runtime;
   Owned<v1::resource_provider::Driver> driver;
+
+  ContainerID controllerContainerId;
+  ContainerID nodeContainerId;
+  hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
+  hashmap<ContainerID, Owned<Promise<csi::Client>>> services;
 };
 
 
@@ -230,6 +326,35 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
+  // Set CSI version to 0.1.0.
+  csiVersion.set_major(0);
+  csiVersion.set_minor(1);
+  csiVersion.set_patch(0);
+
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    auto it = find(
+        container.services().begin(),
+        container.services().end(),
+        CSIPluginContainerInfo::CONTROLLER_SERVICE);
+    if (it != container.services().end()) {
+      controllerContainerId = getContainerId(info, container);
+      break;
+    }
+  }
+
+  foreach (const CSIPluginContainerInfo& container,
+           info.storage().containers()) {
+    auto it = find(
+        container.services().begin(),
+        container.services().end(),
+        CSIPluginContainerInfo::NODE_SERVICE);
+    if (it != container.services().end()) {
+      nodeContainerId = getContainerId(info, container);
+      break;
+    }
+  }
+
   const string message =
     "Failed to recover resource provider with type '" + info.type() +
     "' and name '" + info.name() + "'";
@@ -365,6 +490,196 @@ void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
 }
 
 
+// Returns a future of a CSI client that waits for the endpoint socket
+// to appear if necessary, then connects to the socket and check its
+// supported version.
+Future<csi::Client> StorageLocalResourceProviderProcess::connect(
+    const string& endpoint)
+{
+  Future<csi::Client> client;
+
+  if (os::exists(endpoint)) {
+    client = csi::Client("unix://" + endpoint, runtime);
+  } else {
+    // Wait for the endpoint socket to appear until the timeout expires.
+    Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
+
+    client = loop(
+        self(),
+        [=]() -> Future<Nothing> {
+          if (timeout.expired()) {
+            return Failure("Timed out waiting for endpoint '" + endpoint + "'");
+          }
+
+          return after(Milliseconds(10));
+        },
+        [=](const Nothing&) -> ControlFlow<csi::Client> {
+          if (os::exists(endpoint)) {
+            return Break(csi::Client("unix://" + endpoint, runtime));
+          }
+
+          return Continue();
+        });
+  }
+
+  return client
+    .then(defer(self(), [=](csi::Client client) {
+      return client.GetSupportedVersions(csi::GetSupportedVersionsRequest())
+        .then(defer(self(), [=](
+            const csi::GetSupportedVersionsResponse& response)
+            -> Future<csi::Client> {
+          auto it = find(
+              response.supported_versions().begin(),
+              response.supported_versions().end(),
+              csiVersion);
+          if (it == response.supported_versions().end()) {
+            return Failure(
+                "CSI version " + stringify(csiVersion) + " is not supported");
+          }
+
+          return client;
+        }));
+    }));
+}
+
+
+// Returns a future of the latest CSI client for the specified plugin
+// container. If the container is not already running, this method will
+// start a new a new container daemon.
+Future<csi::Client> StorageLocalResourceProviderProcess::getService(
+    const ContainerID& containerId)
+{
+  if (daemons.contains(containerId)) {
+    CHECK(services.contains(containerId));
+    return services.at(containerId)->future();
+  }
+
+  Option<CSIPluginContainerInfo> config =
+    getCSIPluginContainerInfo(info, containerId);
+
+  CHECK_SOME(config);
+
+  CommandInfo commandInfo;
+
+  if (config->has_command()) {
+    commandInfo.CopyFrom(config->command());
+  }
+
+  // Set the `CSI_ENDPOINT` environment variable.
+  Try<string> endpoint = csi::paths::getEndpointSocketPath(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().type(),
+      info.storage().name(),
+      containerId);
+
+  if (endpoint.isError()) {
+    return Failure(
+        "Failed to resolve endpoint path for plugin container '" +
+        stringify(containerId) + "': " + endpoint.error());
+  }
+
+  const string& endpointPath = endpoint.get();
+  Environment::Variable* endpointVar =
+    commandInfo.mutable_environment()->add_variables();
+  endpointVar->set_name("CSI_ENDPOINT");
+  endpointVar->set_value("unix://" + endpointPath);
+
+  ContainerInfo containerInfo;
+
+  if (config->has_container()) {
+    containerInfo.CopyFrom(config->container());
+  } else {
+    containerInfo.set_type(ContainerInfo::MESOS);
+  }
+
+  // Prepare a volume where the endpoint socket will be placed.
+  const string endpointDir = Path(endpointPath).dirname();
+  Volume* endpointVolume = containerInfo.add_volumes();
+  endpointVolume->set_mode(Volume::RW);
+  endpointVolume->set_container_path(endpointDir);
+  endpointVolume->set_host_path(endpointDir);
+
+  // Prepare the directory where the mount points will be placed.
+  const string mountDir = csi::paths::getMountRootDir(
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().type(),
+      info.storage().name());
+
+  Try<Nothing> mkdir = os::mkdir(mountDir);
+  if (mkdir.isError()) {
+    return Failure(
+        "Failed to create directory '" + mountDir +
+        "': " + mkdir.error());
+  }
+
+  // Prepare a volume where the mount points will be placed.
+  Volume* mountVolume = containerInfo.add_volumes();
+  mountVolume->set_mode(Volume::RW);
+  mountVolume->set_container_path(mountDir);
+  mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
+  mountVolume->mutable_source()->mutable_host_path()->set_path(mountDir);
+  mountVolume->mutable_source()->mutable_host_path()
+    ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
+
+  CHECK(!services.contains(containerId));
+  services[containerId].reset(new Promise<csi::Client>());
+
+  Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
+      extractParentEndpoint(url),
+      authToken,
+      containerId,
+      commandInfo,
+      config->resources(),
+      containerInfo,
+      std::function<Future<Nothing>()>(defer(self(), [=]() {
+        CHECK(services.at(containerId)->future().isPending());
+
+        return connect(endpointPath)
+          .then(defer(self(), [=](const csi::Client& client) {
+            services.at(containerId)->set(client);
+            return Nothing();
+          }))
+          .onFailed(defer(self(), [=](const string& failure) {
+            services.at(containerId)->fail(failure);
+          }))
+          .onDiscarded(defer(self(), [=] {
+            services.at(containerId)->discard();
+          }));
+      })),
+      std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
+        services.at(containerId)->discard();
+        services.at(containerId).reset(new Promise<csi::Client>());
+
+        if (os::exists(endpointPath)) {
+          Try<Nothing> rm = os::rm(endpointPath);
+          if (rm.isError()) {
+            return Failure(
+                "Failed to remove endpoint '" + endpointPath +
+                "': " + rm.error());
+          }
+        }
+
+        return Nothing();
+      })));
+
+  if (daemon.isError()) {
+    return Failure(
+        "Failed to create container daemon for plugin container '" +
+        stringify(containerId) + "': " + daemon.error());
+  }
+
+  const string message =
+    "Container daemon for '" + stringify(containerId) + "' failed";
+
+  daemons[containerId] = daemon.get();
+  daemon.get()->wait()
+    .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+    .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
+
+  return services.at(containerId)->future();
+}
+
+
 Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
     const http::URL& url,
     const string& workDir,

http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index 15eb848..8369d19 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -651,6 +651,14 @@ ostream& operator<<(ostream& stream, const CheckInfo::Type& type)
 
 ostream& operator<<(
     ostream& stream,
+    const CSIPluginContainerInfo::Service& service)
+{
+  return stream << CSIPluginContainerInfo::Service_Name(service);
+}
+
+
+ostream& operator<<(
+    ostream& stream,
     const FrameworkInfo::Capability& capability)
 {
   return stream << FrameworkInfo::Capability::Type_Name(capability.type());