You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/02 01:32:12 UTC
[5/5] mesos git commit: Added `getService()` function to launch CSI
plugins.
Added `getService()` function to launch CSI plugins.
The `getService()` method first checks if there is already a container
daemon for the specified plugin component, and creates a new one if not.
The post-start hook for the container daemon will call `connect()` to
wait for the endpoint socket file to appear and connect to it, then
set up the corresponding promise of CSI client. The post-stop hook will
remove the socket file and create a new promise for the next call to the
post-start hook to set it up.
Review: https://reviews.apache.org/r/63021/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/434a0941
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/434a0941
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/434a0941
Branch: refs/heads/master
Commit: 434a09410b8ecf4c163ddba441db40cc1a2f711d
Parents: 619f4ae
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 1 15:11:39 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 1 17:32:03 2017 -0800
----------------------------------------------------------------------
include/mesos/type_utils.hpp | 5 +
include/mesos/v1/mesos.hpp | 5 +
src/common/type_utils.cpp | 8 +
src/resource_provider/storage/provider.cpp | 317 +++++++++++++++++++++++-
src/v1/mesos.cpp | 8 +
5 files changed, 342 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index a348c7d..d28d538 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -429,6 +429,11 @@ std::ostream& operator<<(std::ostream& stream, const CheckInfo::Type& type);
std::ostream& operator<<(
std::ostream& stream,
+ const CSIPluginContainerInfo::Service& service);
+
+
+std::ostream& operator<<(
+ std::ostream& stream,
const FrameworkInfo::Capability& capability);
http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index d1401fb..de61c36 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -423,6 +423,11 @@ std::ostream& operator<<(std::ostream& stream, const CheckInfo::Type& type);
std::ostream& operator<<(
std::ostream& stream,
+ const CSIPluginContainerInfo::Service& service);
+
+
+std::ostream& operator<<(
+ std::ostream& stream,
const FrameworkInfo::Capability& capability);
http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 3657d55..a272221 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -743,6 +743,14 @@ ostream& operator<<(ostream& stream, const CheckInfo::Type& type)
ostream& operator<<(
ostream& stream,
+ const CSIPluginContainerInfo::Service& service)
+{
+ return stream << CSIPluginContainerInfo::Service_Name(service);
+}
+
+
+ostream& operator<<(
+ ostream& stream,
const FrameworkInfo::Capability& capability)
{
return stream << FrameworkInfo::Capability::Type_Name(capability.type());
http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index eee857b..bbf168a 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -16,48 +16,72 @@
#include "resource_provider/storage/provider.hpp"
+#include <algorithm>
#include <cctype>
#include <glog/logging.h>
+#include <process/after.hpp>
#include <process/defer.hpp>
#include <process/delay.hpp>
#include <process/id.hpp>
+#include <process/loop.hpp>
#include <process/process.hpp>
+#include <process/timeout.hpp>
+
+#include <mesos/type_utils.hpp>
#include <mesos/resource_provider/resource_provider.hpp>
#include <mesos/v1/resource_provider.hpp>
+#include <stout/foreach.hpp>
+#include <stout/hashmap.hpp>
#include <stout/path.hpp>
+#include <stout/os/exists.hpp>
+#include <stout/os/mkdir.hpp>
#include <stout/os/realpath.hpp>
+#include <stout/os/rm.hpp>
#include "common/http.hpp"
+#include "csi/client.hpp"
+#include "csi/paths.hpp"
+#include "csi/utils.hpp"
+
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
#include "resource_provider/detector.hpp"
+#include "slave/container_daemon.hpp"
#include "slave/paths.hpp"
namespace http = process::http;
+using std::find;
using std::queue;
using std::string;
+using process::Break;
+using process::Continue;
+using process::ControlFlow;
using process::Failure;
using process::Future;
using process::Owned;
using process::Process;
+using process::Promise;
+using process::Timeout;
+using process::after;
using process::defer;
+using process::loop;
using process::spawn;
using process::http::authentication::Principal;
-using mesos::ResourceProviderInfo;
+using mesos::internal::slave::ContainerDaemon;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
@@ -101,6 +125,10 @@ static bool isValidType(const string& s)
}
+// Timeout for a CSI plugin component to create its endpoint socket.
+static const Duration CSI_ENDPOINT_CREATION_TIMEOUT = Seconds(5);
+
+
// Returns a prefix for naming standalone containers to run CSI plugins
// for the resource provider. The prefix is of the following format:
// <rp_type>-<rp_name>--
@@ -117,6 +145,64 @@ static inline string getContainerIdPrefix(const ResourceProviderInfo& info)
}
+// Returns the container ID of the standalone container to run a CSI
+// plugin component. The container ID is of the following format:
+// <rp_type>-<rp_name>--<csi_type>-<csi_name>--<list_of_services>
+// where <rp_type> and <rp_name> are the type and name of the resource
+// provider, and <csi_type> and <csi_name> are the type and name of the
+// CSI plugin, with dots replaced by dashes. <list_of_services> lists
+// the CSI services provided by the component, concatenated with dashes.
+static inline ContainerID getContainerId(
+ const ResourceProviderInfo& info,
+ const CSIPluginContainerInfo& container)
+{
+ string value = getContainerIdPrefix(info);
+
+ value += strings::join(
+ "-",
+ strings::replace(info.storage().type(), ".", "-"),
+ info.storage().name(),
+ "");
+
+ for (int i = 0; i < container.services_size(); i++) {
+ value += "-" + stringify(container.services(i));
+ }
+
+ ContainerID containerId;
+ containerId.set_value(value);
+
+ return containerId;
+}
+
+
+static Option<CSIPluginContainerInfo> getCSIPluginContainerInfo(
+ const ResourceProviderInfo& info,
+ const ContainerID& containerId)
+{
+ foreach (const CSIPluginContainerInfo& container,
+ info.storage().containers()) {
+ if (getContainerId(info, container) == containerId) {
+ return container;
+ }
+ }
+
+ return None();
+}
+
+
+// Returns the parent endpoint as a URL.
+// TODO(jieyu): Consider using a more reliable way to get the agent v1
+// operator API endpoint URL.
+static inline http::URL extractParentEndpoint(const http::URL& url)
+{
+ http::URL parent = url;
+
+ parent.path = Path(url.path).dirname();
+
+ return parent;
+}
+
+
class StorageLocalResourceProviderProcess
: public Process<StorageLocalResourceProviderProcess>
{
@@ -159,6 +245,9 @@ private:
void operation(const Event::Operation& operation);
void publish(const Event::Publish& publish);
+ Future<csi::Client> connect(const string& endpoint);
+ Future<csi::Client> getService(const ContainerID& containerId);
+
enum State
{
RECOVERING,
@@ -176,7 +265,14 @@ private:
const SlaveID slaveId;
const Option<string> authToken;
+ csi::Version csiVersion;
+ process::grpc::client::Runtime runtime;
Owned<v1::resource_provider::Driver> driver;
+
+ ContainerID controllerContainerId;
+ ContainerID nodeContainerId;
+ hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
+ hashmap<ContainerID, Owned<Promise<csi::Client>>> services;
};
@@ -230,6 +326,35 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
void StorageLocalResourceProviderProcess::initialize()
{
+ // Set CSI version to 0.1.0.
+ csiVersion.set_major(0);
+ csiVersion.set_minor(1);
+ csiVersion.set_patch(0);
+
+ foreach (const CSIPluginContainerInfo& container,
+ info.storage().containers()) {
+ auto it = find(
+ container.services().begin(),
+ container.services().end(),
+ CSIPluginContainerInfo::CONTROLLER_SERVICE);
+ if (it != container.services().end()) {
+ controllerContainerId = getContainerId(info, container);
+ break;
+ }
+ }
+
+ foreach (const CSIPluginContainerInfo& container,
+ info.storage().containers()) {
+ auto it = find(
+ container.services().begin(),
+ container.services().end(),
+ CSIPluginContainerInfo::NODE_SERVICE);
+ if (it != container.services().end()) {
+ nodeContainerId = getContainerId(info, container);
+ break;
+ }
+ }
+
const string message =
"Failed to recover resource provider with type '" + info.type() +
"' and name '" + info.name() + "'";
@@ -365,6 +490,196 @@ void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
}
+// Returns a future of a CSI client that waits for the endpoint socket
+// to appear if necessary, then connects to the socket and check its
+// supported version.
+Future<csi::Client> StorageLocalResourceProviderProcess::connect(
+ const string& endpoint)
+{
+ Future<csi::Client> client;
+
+ if (os::exists(endpoint)) {
+ client = csi::Client("unix://" + endpoint, runtime);
+ } else {
+ // Wait for the endpoint socket to appear until the timeout expires.
+ Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
+
+ client = loop(
+ self(),
+ [=]() -> Future<Nothing> {
+ if (timeout.expired()) {
+ return Failure("Timed out waiting for endpoint '" + endpoint + "'");
+ }
+
+ return after(Milliseconds(10));
+ },
+ [=](const Nothing&) -> ControlFlow<csi::Client> {
+ if (os::exists(endpoint)) {
+ return Break(csi::Client("unix://" + endpoint, runtime));
+ }
+
+ return Continue();
+ });
+ }
+
+ return client
+ .then(defer(self(), [=](csi::Client client) {
+ return client.GetSupportedVersions(csi::GetSupportedVersionsRequest())
+ .then(defer(self(), [=](
+ const csi::GetSupportedVersionsResponse& response)
+ -> Future<csi::Client> {
+ auto it = find(
+ response.supported_versions().begin(),
+ response.supported_versions().end(),
+ csiVersion);
+ if (it == response.supported_versions().end()) {
+ return Failure(
+ "CSI version " + stringify(csiVersion) + " is not supported");
+ }
+
+ return client;
+ }));
+ }));
+}
+
+
+// Returns a future of the latest CSI client for the specified plugin
+// container. If the container is not already running, this method will
+// start a new a new container daemon.
+Future<csi::Client> StorageLocalResourceProviderProcess::getService(
+ const ContainerID& containerId)
+{
+ if (daemons.contains(containerId)) {
+ CHECK(services.contains(containerId));
+ return services.at(containerId)->future();
+ }
+
+ Option<CSIPluginContainerInfo> config =
+ getCSIPluginContainerInfo(info, containerId);
+
+ CHECK_SOME(config);
+
+ CommandInfo commandInfo;
+
+ if (config->has_command()) {
+ commandInfo.CopyFrom(config->command());
+ }
+
+ // Set the `CSI_ENDPOINT` environment variable.
+ Try<string> endpoint = csi::paths::getEndpointSocketPath(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().type(),
+ info.storage().name(),
+ containerId);
+
+ if (endpoint.isError()) {
+ return Failure(
+ "Failed to resolve endpoint path for plugin container '" +
+ stringify(containerId) + "': " + endpoint.error());
+ }
+
+ const string& endpointPath = endpoint.get();
+ Environment::Variable* endpointVar =
+ commandInfo.mutable_environment()->add_variables();
+ endpointVar->set_name("CSI_ENDPOINT");
+ endpointVar->set_value("unix://" + endpointPath);
+
+ ContainerInfo containerInfo;
+
+ if (config->has_container()) {
+ containerInfo.CopyFrom(config->container());
+ } else {
+ containerInfo.set_type(ContainerInfo::MESOS);
+ }
+
+ // Prepare a volume where the endpoint socket will be placed.
+ const string endpointDir = Path(endpointPath).dirname();
+ Volume* endpointVolume = containerInfo.add_volumes();
+ endpointVolume->set_mode(Volume::RW);
+ endpointVolume->set_container_path(endpointDir);
+ endpointVolume->set_host_path(endpointDir);
+
+ // Prepare the directory where the mount points will be placed.
+ const string mountDir = csi::paths::getMountRootDir(
+ slave::paths::getCsiRootDir(workDir),
+ info.storage().type(),
+ info.storage().name());
+
+ Try<Nothing> mkdir = os::mkdir(mountDir);
+ if (mkdir.isError()) {
+ return Failure(
+ "Failed to create directory '" + mountDir +
+ "': " + mkdir.error());
+ }
+
+ // Prepare a volume where the mount points will be placed.
+ Volume* mountVolume = containerInfo.add_volumes();
+ mountVolume->set_mode(Volume::RW);
+ mountVolume->set_container_path(mountDir);
+ mountVolume->mutable_source()->set_type(Volume::Source::HOST_PATH);
+ mountVolume->mutable_source()->mutable_host_path()->set_path(mountDir);
+ mountVolume->mutable_source()->mutable_host_path()
+ ->mutable_mount_propagation()->set_mode(MountPropagation::BIDIRECTIONAL);
+
+ CHECK(!services.contains(containerId));
+ services[containerId].reset(new Promise<csi::Client>());
+
+ Try<Owned<ContainerDaemon>> daemon = ContainerDaemon::create(
+ extractParentEndpoint(url),
+ authToken,
+ containerId,
+ commandInfo,
+ config->resources(),
+ containerInfo,
+ std::function<Future<Nothing>()>(defer(self(), [=]() {
+ CHECK(services.at(containerId)->future().isPending());
+
+ return connect(endpointPath)
+ .then(defer(self(), [=](const csi::Client& client) {
+ services.at(containerId)->set(client);
+ return Nothing();
+ }))
+ .onFailed(defer(self(), [=](const string& failure) {
+ services.at(containerId)->fail(failure);
+ }))
+ .onDiscarded(defer(self(), [=] {
+ services.at(containerId)->discard();
+ }));
+ })),
+ std::function<Future<Nothing>()>(defer(self(), [=]() -> Future<Nothing> {
+ services.at(containerId)->discard();
+ services.at(containerId).reset(new Promise<csi::Client>());
+
+ if (os::exists(endpointPath)) {
+ Try<Nothing> rm = os::rm(endpointPath);
+ if (rm.isError()) {
+ return Failure(
+ "Failed to remove endpoint '" + endpointPath +
+ "': " + rm.error());
+ }
+ }
+
+ return Nothing();
+ })));
+
+ if (daemon.isError()) {
+ return Failure(
+ "Failed to create container daemon for plugin container '" +
+ stringify(containerId) + "': " + daemon.error());
+ }
+
+ const string message =
+ "Container daemon for '" + stringify(containerId) + "' failed";
+
+ daemons[containerId] = daemon.get();
+ daemon.get()->wait()
+ .onFailed(defer(self(), &Self::fatal, message, lambda::_1))
+ .onDiscarded(defer(self(), &Self::fatal, message, "future discarded"));
+
+ return services.at(containerId)->future();
+}
+
+
Try<Owned<LocalResourceProvider>> StorageLocalResourceProvider::create(
const http::URL& url,
const string& workDir,
http://git-wip-us.apache.org/repos/asf/mesos/blob/434a0941/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index 15eb848..8369d19 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -651,6 +651,14 @@ ostream& operator<<(ostream& stream, const CheckInfo::Type& type)
ostream& operator<<(
ostream& stream,
+ const CSIPluginContainerInfo::Service& service)
+{
+ return stream << CSIPluginContainerInfo::Service_Name(service);
+}
+
+
+ostream& operator<<(
+ ostream& stream,
const FrameworkInfo::Capability& capability)
{
return stream << FrameworkInfo::Capability::Type_Name(capability.type());