You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/09 20:35:59 UTC

[mesos] branch 1.8.x updated (0baa382 -> f6159cb)

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a change to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from 0baa382  Added MESOS-9667 to the 1.8.0 CHANGELOG.
     new 315340f  Fixed a container ID generation issue in the CSI service manager.
     new 20f076b  Auto-detect CSI API version to create the proper volume manager.
     new f6159cb  Added MESOS-9624 and MESOS-9626 to the 1.8.0 CHANGELOG.

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 CHANGELOG                                          |   2 +
 src/csi/service_manager.cpp                        | 194 ++++++++++++++++-----
 src/csi/service_manager.hpp                        |   1 +
 src/csi/v0_volume_manager.cpp                      |  28 +--
 src/csi/v0_volume_manager.hpp                      |   4 +-
 src/csi/v0_volume_manager_process.hpp              |  10 +-
 src/csi/v1_volume_manager.cpp                      |  28 +--
 src/csi/v1_volume_manager.hpp                      |   4 +-
 src/csi/v1_volume_manager_process.hpp              |  10 +-
 src/csi/volume_manager.cpp                         |  31 ++--
 src/csi/volume_manager.hpp                         |   7 +-
 src/resource_provider/storage/provider.cpp         |  77 +++++---
 .../storage_local_resource_provider_tests.cpp      |   8 +
 13 files changed, 255 insertions(+), 149 deletions(-)


[mesos] 03/03: Added MESOS-9624 and MESOS-9626 to the 1.8.0 CHANGELOG.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit f6159cb1caab84c6d3725b29b3f46a1c40a21b2b
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Tue Apr 9 13:14:56 2019 -0700

    Added MESOS-9624 and MESOS-9626 to the 1.8.0 CHANGELOG.
---
 CHANGELOG | 2 ++
 1 file changed, 2 insertions(+)

diff --git a/CHANGELOG b/CHANGELOG
index a4689d2..eec6093 100644
--- a/CHANGELOG
+++ b/CHANGELOG
@@ -295,7 +295,9 @@ All Resolved Issues:
   * [MESOS-9620] - Add metrics for volume gid manager
   * [MESOS-9622] - Refactor SLRP with a CSI volume manager.
   * [MESOS-9623] - Implement CSI volume manager with CSI v1.
+  * [MESOS-9624] - Bundle CSI spec v1.0 in Mesos.
   * [MESOS-9625] - Make `DiskProfileAdaptor` agnostic to CSI spec version.
+  * [MESOS-9626] - Make SLRP pick the appropriate CSI versions for plugins.
   * [MESOS-9632] - Refactor SLRP with a CSI service manager.
   * [MESOS-9639] - Make CSI plugin RPC metrics agnostic to CSI versions.
   * [MESOS-9648] - Make operation reconciliation send asynchronous updates


[mesos] 02/03: Auto-detect CSI API version to create the proper volume manager.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 20f076b28e3eaf06f7c942c6117a26d4dad8ed32
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Mon Apr 8 13:49:27 2019 -0700

    Auto-detect CSI API version to create the proper volume manager.
    
    The `mesos::csi::VolumeManager::create` function now first creates a
    `ServiceManager` to detect the API version, then instantiate the proper
    `VolumeManager` based on it.
    
    NOTE: This patch enables CSI v1 by default for all SLRP-related unit
    tests except for test `RetryRpcWithExponentialBackoff`.
    
    Review: https://reviews.apache.org/r/70428
---
 src/csi/service_manager.cpp                        | 182 +++++++++++++++------
 src/csi/service_manager.hpp                        |   1 +
 src/csi/v0_volume_manager.cpp                      |  28 +---
 src/csi/v0_volume_manager.hpp                      |   4 +-
 src/csi/v0_volume_manager_process.hpp              |  10 +-
 src/csi/v1_volume_manager.cpp                      |  28 +---
 src/csi/v1_volume_manager.hpp                      |   4 +-
 src/csi/v1_volume_manager_process.hpp              |  10 +-
 src/csi/volume_manager.cpp                         |  31 ++--
 src/csi/volume_manager.hpp                         |   7 +-
 src/resource_provider/storage/provider.cpp         |  77 ++++++---
 .../storage_local_resource_provider_tests.cpp      |   8 +
 12 files changed, 243 insertions(+), 147 deletions(-)

diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp
index d6d0d4a..a87df96 100644
--- a/src/csi/service_manager.cpp
+++ b/src/csi/service_manager.cpp
@@ -52,6 +52,7 @@
 
 #include "csi/paths.hpp"
 #include "csi/v0_client.hpp"
+#include "csi/v1_client.hpp"
 
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
@@ -139,6 +140,7 @@ public:
   Future<Nothing> recover();
 
   Future<string> getServiceEndpoint(const Service& service);
+  Future<string> getApiVersion();
 
 private:
   // Returns the container info of the specified container for this CSI plugin.
@@ -155,9 +157,12 @@ private:
   // Kills the specified plugin container.
   Future<Nothing> killContainer(const ContainerID& containerId);
 
-  // Waits for the endpoint (URI to a Unix domain socket) to be ready.
+  // Waits for the endpoint (URI to a Unix domain socket) to be created.
   Future<Nothing> waitEndpoint(const string& endpoint);
 
+  // Probes the endpoint to detect the API version and check for readiness.
+  Future<Nothing> probeEndpoint(const string& endpoint);
+
   // Returns the URI of the latest service endpoint for the specified plugin
   // container. If the container is not already running, this method will start
   // a new container.
@@ -174,6 +179,7 @@ private:
   Metrics* metrics;
 
   http::Headers headers;
+  Option<string> apiVersion;
   hashmap<Service, ContainerID> serviceContainers;
 
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
@@ -350,6 +356,20 @@ Future<string> ServiceManagerProcess::getServiceEndpoint(const Service& service)
 }
 
 
+Future<string> ServiceManagerProcess::getApiVersion()
+{
+  if (apiVersion.isSome()) {
+    return apiVersion.get();
+  }
+
+  // Ensure that the plugin has been probed (which does the API version
+  // detection) through `getEndpoint` before returning the API version.
+  CHECK(!serviceContainers.empty());
+  return getEndpoint(serviceContainers.begin()->second)
+    .then(process::defer(self(), [=] { return CHECK_NOTNONE(apiVersion); }));
+}
+
+
 Option<CSIPluginContainerInfo> ServiceManagerProcess::getContainerInfo(
     const ContainerID& containerId)
 {
@@ -384,8 +404,8 @@ ServiceManagerProcess::getContainers()
             httpResponse.status + "' (" + httpResponse.body + ")");
       }
 
-      Try<v1::agent::Response> v1Response =
-        internal::deserialize<v1::agent::Response>(
+      Try<mesos::v1::agent::Response> v1Response =
+        internal::deserialize<mesos::v1::agent::Response>(
             contentType, httpResponse.body);
 
       if (v1Response.isError()) {
@@ -473,55 +493,117 @@ Future<Nothing> ServiceManagerProcess::waitEndpoint(const string& endpoint)
   const string endpointPath =
     strings::remove(endpoint, "unix://", strings::PREFIX);
 
-  Future<Nothing> created = Nothing();
-  if (!os::exists(endpointPath)) {
-    // Wait for the endpoint socket to appear until the timeout expires.
-    Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
+  if (os::exists(endpointPath)) {
+    return Nothing();
+  }
 
-    created = process::loop(
-        [=]() -> Future<Nothing> {
-          if (timeout.expired()) {
-            return Failure("Timed out waiting for endpoint '" + endpoint + "'");
-          }
+  // Wait for the endpoint socket to appear until the timeout expires.
+  Timeout timeout = Timeout::in(CSI_ENDPOINT_CREATION_TIMEOUT);
 
-          return process::after(Milliseconds(10));
-        },
-        [=](const Nothing&) -> ControlFlow<Nothing> {
-          if (os::exists(endpointPath)) {
-            return Break();
-          }
+  return process::loop(
+      [=]() -> Future<Nothing> {
+        if (timeout.expired()) {
+          return Failure("Timed out waiting for endpoint '" + endpoint + "'");
+        }
 
-          return Continue();
-        });
+        return process::after(Milliseconds(10));
+      },
+      [=](const Nothing&) -> ControlFlow<Nothing> {
+        if (os::exists(endpointPath)) {
+          return Break();
+        }
+
+        return Continue();
+      });
+}
+
+
+Future<Nothing> ServiceManagerProcess::probeEndpoint(const string& endpoint)
+{
+  // Each probe function returns its API version if the probe is successful,
+  // an error if the API version is implemented but the probe fails, or a `None`
+  // if the API version is not implemented.
+  static const hashmap<
+      string,
+      std::function<Future<Result<string>>(const string&, const Runtime&)>>
+    probers = {
+      {v0::API_VERSION,
+       [](const string& endpoint, const Runtime& runtime) {
+         LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v0";
+
+         return v0::Client(endpoint, runtime)
+           .probe(v0::ProbeRequest())
+           .then([](const v0::RPCResult<v0::ProbeResponse>& result) {
+             return result.isError()
+               ? (result.error().status.error_code() == grpc::UNIMPLEMENTED
+                    ? Result<string>::none() : result.error())
+               : v0::API_VERSION;
+           });
+       }},
+      {v1::API_VERSION,
+       [](const string& endpoint, const Runtime& runtime) {
+         LOG(INFO) << "Probing endpoint '" << endpoint << "' with CSI v1";
+
+         return v1::Client(endpoint, runtime).probe(v1::ProbeRequest())
+           .then([](const v1::RPCResult<v1::ProbeResponse>& result) {
+             // TODO(chhsiao): Retry when `result->ready` is false.
+             return result.isError()
+               ? (result.error().status.error_code() == grpc::UNIMPLEMENTED
+                    ? Result<string>::none() : result.error())
+               : v1::API_VERSION;
+           });
+       }},
+    };
+
+  ++metrics->csi_plugin_rpcs_pending;
+
+  Future<Result<string>> probed;
+
+  if (apiVersion.isSome()) {
+    CHECK(probers.contains(apiVersion.get()));
+    probed = probers.at(apiVersion.get())(endpoint, runtime);
+  } else {
+    probed = probers.at(v1::API_VERSION)(endpoint, runtime)
+      .then(process::defer(self(), [=](const Result<string>& result) {
+        return result.isNone()
+          ? probers.at(v0::API_VERSION)(endpoint, runtime) : result;
+      }));
   }
 
-  return created
-    .then(process::defer(self(), [=]() -> Future<Nothing> {
-      // TODO(chhsiao): Detect which CSI version to use through versioned
-      // `Probe` calls to support CSI v1 in a backward compatible way.
-      ++metrics->csi_plugin_rpcs_pending;
-
-      return v0::Client(endpoint, runtime).probe(v0::ProbeRequest())
-        .then(process::defer(self(), [=](
-            const v0::RPCResult<v0::ProbeResponse>& result) -> Future<Nothing> {
-          if (result.isError()) {
-            return Failure(
-                "Failed to probe endpoint '" + endpoint +
-                "': " + stringify(result.error()));
-          }
+  return probed
+    .then(process::defer(self(), [=](
+        const Result<string>& result) -> Future<Nothing> {
+      if (result.isError()) {
+        return Failure(
+            "Failed to probe endpoint '" + endpoint + "': " + result.error());
+      }
 
-          return Nothing();
-        }))
-        .onAny(process::defer(self(), [this](const Future<Nothing>& future) {
-          --metrics->csi_plugin_rpcs_pending;
-          if (future.isReady()) {
-            ++metrics->csi_plugin_rpcs_finished;
-          } else if (future.isDiscarded()) {
-            ++metrics->csi_plugin_rpcs_cancelled;
-          } else {
-            ++metrics->csi_plugin_rpcs_failed;
-          }
-        }));
+      if (result.isNone()) {
+        return Failure(
+            "Failed to probe endpoint '" + endpoint + "': Unknown API version");
+      }
+
+      if (apiVersion.isNone()) {
+        apiVersion = result.get();
+      } else if (apiVersion != result.get()) {
+        return Failure(
+            "Failed to probe endpoint '" + endpoint +
+            "': Inconsistent API version");
+      }
+
+      return Nothing();
+    }))
+    .onAny(process::defer(self(), [this](const Future<Nothing>& future) {
+      // We only update the metrics after the whole detection loop is done so
+      // it won't introduce much noise.
+      --metrics->csi_plugin_rpcs_pending;
+      if (future.isReady()) {
+        ++metrics->csi_plugin_rpcs_finished;
+      } else if (future.isDiscarded()) {
+        ++metrics->csi_plugin_rpcs_cancelled;
+      } else {
+        ++metrics->csi_plugin_rpcs_failed;
+      }
     }));
 }
 
@@ -624,6 +706,7 @@ Future<string> ServiceManagerProcess::getEndpoint(
 
             CHECK(endpoints.at(containerId)->associate(
                 waitEndpoint(endpoint)
+                  .then(process::defer(self(), &Self::probeEndpoint, endpoint))
                   .then([endpoint]() -> string { return endpoint; })));
 
             return endpoints.at(containerId)->future().then([] {
@@ -728,5 +811,12 @@ Future<string> ServiceManager::getServiceEndpoint(const Service& service)
         process.get(), &ServiceManagerProcess::getServiceEndpoint, service));
 }
 
+
+Future<string> ServiceManager::getApiVersion()
+{
+  return recovered
+    .then(process::defer(process.get(), &ServiceManagerProcess::getApiVersion));
+}
+
 } // namespace csi {
 } // namespace mesos {
diff --git a/src/csi/service_manager.hpp b/src/csi/service_manager.hpp
index 5dd6bc7..60a0805 100644
--- a/src/csi/service_manager.hpp
+++ b/src/csi/service_manager.hpp
@@ -73,6 +73,7 @@ public:
   process::Future<Nothing> recover();
 
   process::Future<std::string> getServiceEndpoint(const Service& service);
+  process::Future<std::string> getApiVersion();
 
 private:
   process::Owned<ServiceManagerProcess> process;
diff --git a/src/csi/v0_volume_manager.cpp b/src/csi/v0_volume_manager.cpp
index 02de17e..e19dc7c 100644
--- a/src/csi/v0_volume_manager.cpp
+++ b/src/csi/v0_volume_manager.cpp
@@ -63,6 +63,7 @@ using process::Continue;
 using process::ControlFlow;
 using process::Failure;
 using process::Future;
+using process::Owned;
 using process::ProcessBase;
 
 using process::grpc::StatusError;
@@ -74,29 +75,19 @@ namespace csi {
 namespace v0 {
 
 VolumeManagerProcess::VolumeManagerProcess(
-    const http::URL& agentUrl,
     const string& _rootDir,
     const CSIPluginInfo& _info,
     const hashset<Service> _services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& _runtime,
+    ServiceManager* _serviceManager,
     Metrics* _metrics)
   : ProcessBase(process::ID::generate("csi-v0-volume-manager")),
     rootDir(_rootDir),
     info(_info),
     services(_services),
     runtime(_runtime),
-    metrics(_metrics),
-    serviceManager(new ServiceManager(
-        agentUrl,
-        rootDir,
-        info,
-        services,
-        containerPrefix,
-        authToken,
-        runtime,
-        metrics))
+    serviceManager(_serviceManager),
+    metrics(_metrics)
 {
   // This should have been validated in `VolumeManager::create`.
   CHECK(!services.empty())
@@ -114,8 +105,7 @@ Future<Nothing> VolumeManagerProcess::recover()
 
   bootId = bootId_.get();
 
-  return serviceManager->recover()
-    .then(process::defer(self(), &Self::prepareServices))
+  return prepareServices()
     .then(process::defer(self(), [this]() -> Future<Nothing> {
       // Recover the states of CSI volumes.
       Try<list<string>> volumePaths =
@@ -1198,22 +1188,18 @@ void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId)
 
 
 VolumeManager::VolumeManager(
-    const http::URL& agentUrl,
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& runtime,
+    ServiceManager* serviceManager,
     Metrics* metrics)
   : process(new VolumeManagerProcess(
-        agentUrl,
         rootDir,
         info,
         services,
-        containerPrefix,
-        authToken,
         runtime,
+        serviceManager,
         metrics))
 {
   process::spawn(CHECK_NOTNULL(process.get()));
diff --git a/src/csi/v0_volume_manager.hpp b/src/csi/v0_volume_manager.hpp
index 6c15f29..9d572e7 100644
--- a/src/csi/v0_volume_manager.hpp
+++ b/src/csi/v0_volume_manager.hpp
@@ -53,13 +53,11 @@ class VolumeManager : public csi::VolumeManager
 {
 public:
   VolumeManager(
-      const process::http::URL& agentUrl,
       const std::string& rootDir,
       const CSIPluginInfo& info,
       const hashset<Service>& services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& runtime,
+      ServiceManager* serviceManager,
       Metrics* metrics);
 
   // Since this class contains `Owned` members which should not but can be
diff --git a/src/csi/v0_volume_manager_process.hpp b/src/csi/v0_volume_manager_process.hpp
index c3cd6ca..4cfb5b5 100644
--- a/src/csi/v0_volume_manager_process.hpp
+++ b/src/csi/v0_volume_manager_process.hpp
@@ -70,13 +70,11 @@ class VolumeManagerProcess : public process::Process<VolumeManagerProcess>
 {
 public:
   explicit VolumeManagerProcess(
-      const process::http::URL& agentUrl,
       const std::string& _rootDir,
       const CSIPluginInfo& _info,
       const hashset<Service> _services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& _runtime,
+      ServiceManager* _serviceManager,
       Metrics* _metrics);
 
   process::Future<Nothing> recover();
@@ -146,8 +144,8 @@ private:
   //
   //                          +------------+
   //                 +  +  +  |  CREATED   |  ^
-  //   _attachVolume |  |  |  +---+----^---+  |
-  //                 |  |  |      |    |      | _detachVolume
+  //                 |  |  |  +---+----^---+  |
+  //   _attachVolume |  |  |      |    |      | _detachVolume
   //                 |  |  |  +---v----+---+  |
   //                 v  +  +  | NODE_READY |  +  ^
   //                    |  |  +---+----^---+  |  |
@@ -187,8 +185,8 @@ private:
   const hashset<Service> services;
 
   process::grpc::client::Runtime runtime;
+  ServiceManager* serviceManager;
   Metrics* metrics;
-  process::Owned<ServiceManager> serviceManager;
 
   Option<std::string> bootId;
   Option<PluginCapabilities> pluginCapabilities;
diff --git a/src/csi/v1_volume_manager.cpp b/src/csi/v1_volume_manager.cpp
index bd334f1..bf640f9 100644
--- a/src/csi/v1_volume_manager.cpp
+++ b/src/csi/v1_volume_manager.cpp
@@ -64,6 +64,7 @@ using process::Continue;
 using process::ControlFlow;
 using process::Failure;
 using process::Future;
+using process::Owned;
 using process::ProcessBase;
 
 using process::grpc::StatusError;
@@ -75,29 +76,19 @@ namespace csi {
 namespace v1 {
 
 VolumeManagerProcess::VolumeManagerProcess(
-    const http::URL& agentUrl,
     const string& _rootDir,
     const CSIPluginInfo& _info,
     const hashset<Service> _services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& _runtime,
+    ServiceManager* _serviceManager,
     Metrics* _metrics)
   : ProcessBase(process::ID::generate("csi-v1-volume-manager")),
     rootDir(_rootDir),
     info(_info),
     services(_services),
     runtime(_runtime),
-    metrics(_metrics),
-    serviceManager(new ServiceManager(
-        agentUrl,
-        rootDir,
-        info,
-        services,
-        containerPrefix,
-        authToken,
-        runtime,
-        metrics))
+    serviceManager(_serviceManager),
+    metrics(_metrics)
 {
   // This should have been validated in `VolumeManager::create`.
   CHECK(!services.empty())
@@ -115,8 +106,7 @@ Future<Nothing> VolumeManagerProcess::recover()
 
   bootId = bootId_.get();
 
-  return serviceManager->recover()
-    .then(process::defer(self(), &Self::prepareServices))
+  return prepareServices()
     .then(process::defer(self(), [this]() -> Future<Nothing> {
       // Recover the states of CSI volumes.
       Try<list<string>> volumePaths =
@@ -1224,22 +1214,18 @@ void VolumeManagerProcess::garbageCollectMountPath(const string& volumeId)
 
 
 VolumeManager::VolumeManager(
-    const http::URL& agentUrl,
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
     const Runtime& runtime,
+    ServiceManager* serviceManager,
     Metrics* metrics)
   : process(new VolumeManagerProcess(
-        agentUrl,
         rootDir,
         info,
         services,
-        containerPrefix,
-        authToken,
         runtime,
+        serviceManager,
         metrics))
 {
   process::spawn(CHECK_NOTNULL(process.get()));
diff --git a/src/csi/v1_volume_manager.hpp b/src/csi/v1_volume_manager.hpp
index f8e6095..ba984a9 100644
--- a/src/csi/v1_volume_manager.hpp
+++ b/src/csi/v1_volume_manager.hpp
@@ -53,13 +53,11 @@ class VolumeManager : public csi::VolumeManager
 {
 public:
   VolumeManager(
-      const process::http::URL& agentUrl,
       const std::string& rootDir,
       const CSIPluginInfo& info,
       const hashset<Service>& services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& runtime,
+      ServiceManager* serviceManager,
       Metrics* metrics);
 
   // Since this class contains `Owned` members which should not but can be
diff --git a/src/csi/v1_volume_manager_process.hpp b/src/csi/v1_volume_manager_process.hpp
index 1c80399..30788c3 100644
--- a/src/csi/v1_volume_manager_process.hpp
+++ b/src/csi/v1_volume_manager_process.hpp
@@ -70,13 +70,11 @@ class VolumeManagerProcess : public process::Process<VolumeManagerProcess>
 {
 public:
   explicit VolumeManagerProcess(
-      const process::http::URL& agentUrl,
       const std::string& _rootDir,
       const CSIPluginInfo& _info,
       const hashset<Service> _services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
       const process::grpc::client::Runtime& _runtime,
+      ServiceManager* _serviceManager,
       Metrics* _metrics);
 
   process::Future<Nothing> recover();
@@ -146,8 +144,8 @@ private:
   //
   //                          +------------+
   //                 +  +  +  |  CREATED   |  ^
-  //   _attachVolume |  |  |  +---+----^---+  |
-  //                 |  |  |      |    |      | _detachVolume
+  //                 |  |  |  +---+----^---+  |
+  //   _attachVolume |  |  |      |    |      | _detachVolume
   //                 |  |  |  +---v----+---+  |
   //                 v  +  +  | NODE_READY |  +  ^
   //                    |  |  +---+----^---+  |  |
@@ -187,8 +185,8 @@ private:
   const hashset<Service> services;
 
   process::grpc::client::Runtime runtime;
+  ServiceManager* serviceManager;
   Metrics* metrics;
-  process::Owned<ServiceManager> serviceManager;
 
   Option<std::string> bootId;
   Option<PluginCapabilities> pluginCapabilities;
diff --git a/src/csi/volume_manager.cpp b/src/csi/volume_manager.cpp
index cbe45cb..c47adfe 100644
--- a/src/csi/volume_manager.cpp
+++ b/src/csi/volume_manager.cpp
@@ -16,9 +16,14 @@
 
 #include "csi/volume_manager.hpp"
 
-#include <process/grpc.hpp>
+#include <memory>
 
+#include <mesos/csi/v0.hpp>
+#include <mesos/csi/v1.hpp>
+
+#include "csi/service_manager.hpp"
 #include "csi/v0_volume_manager.hpp"
+#include "csi/v1_volume_manager.hpp"
 
 namespace http = process::http;
 
@@ -32,12 +37,12 @@ namespace mesos {
 namespace csi {
 
 Try<Owned<VolumeManager>> VolumeManager::create(
-    const http::URL& agentUrl,
     const string& rootDir,
     const CSIPluginInfo& info,
     const hashset<Service>& services,
-    const string& containerPrefix,
-    const Option<string>& authToken,
+    const string& apiVersion,
+    const Runtime& runtime,
+    ServiceManager* serviceManager,
     Metrics* metrics)
 {
   if (services.empty()) {
@@ -46,15 +51,15 @@ Try<Owned<VolumeManager>> VolumeManager::create(
         info.type() + "' and name '" + info.name() + "'");
   }
 
-  return new v0::VolumeManager(
-      agentUrl,
-      rootDir,
-      info,
-      services,
-      containerPrefix,
-      authToken,
-      Runtime(),
-      metrics);
+  if (apiVersion == v0::API_VERSION) {
+    return Try<Owned<VolumeManager>>(new v0::VolumeManager(
+        rootDir, info, services, runtime, serviceManager, metrics));
+  } else if (apiVersion == v1::API_VERSION) {
+    return Try<Owned<VolumeManager>>(new v1::VolumeManager(
+        rootDir, info, services, runtime, serviceManager, metrics));
+  }
+
+  return Error("Unsupported CSI API version: " + apiVersion);
 }
 
 } // namespace csi {
diff --git a/src/csi/volume_manager.hpp b/src/csi/volume_manager.hpp
index cc20f46..0aa6337 100644
--- a/src/csi/volume_manager.hpp
+++ b/src/csi/volume_manager.hpp
@@ -27,6 +27,7 @@
 #include <mesos/csi/types.hpp>
 
 #include <process/future.hpp>
+#include <process/grpc.hpp>
 #include <process/http.hpp>
 #include <process/owned.hpp>
 
@@ -56,12 +57,12 @@ class VolumeManager
 {
 public:
   static Try<process::Owned<VolumeManager>> create(
-      const process::http::URL& agentUrl,
       const std::string& rootDir,
       const CSIPluginInfo& info,
       const hashset<Service>& services,
-      const std::string& containerPrefix,
-      const Option<std::string>& authToken,
+      const std::string& apiVersion,
+      const process::grpc::client::Runtime& runtime,
+      ServiceManager* serviceManager,
       Metrics* metrics);
 
   virtual ~VolumeManager() = default;
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 2dc5c26..b2ca5d0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -72,6 +72,7 @@
 
 #include "csi/metrics.hpp"
 #include "csi/paths.hpp"
+#include "csi/service_manager.hpp"
 #include "csi/volume_manager.hpp"
 
 #include "internal/devolve.hpp"
@@ -113,11 +114,14 @@ using process::spawn;
 
 using process::grpc::StatusError;
 
+using process::grpc::client::Runtime;
+
 using process::http::authentication::Principal;
 
 using process::metrics::Counter;
 using process::metrics::PushGauge;
 
+using mesos::csi::ServiceManager;
 using mesos::csi::VolumeInfo;
 using mesos::csi::VolumeManager;
 
@@ -180,6 +184,14 @@ static inline http::URL extractParentEndpoint(const http::URL& url)
 }
 
 
+static string getContainerPrefix(const ResourceProviderInfo& info)
+{
+  const Principal principal = LocalResourceProvider::principal(info);
+  CHECK(principal.claims.contains("cid_prefix"));
+  return principal.claims.at("cid_prefix");
+}
+
+
 static inline Resource createRawDiskResource(
     const ResourceProviderInfo& info,
     const Bytes& capacity,
@@ -363,6 +375,11 @@ private:
   // The mapping of known profiles fetched from the DiskProfileAdaptor.
   hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
 
+  Runtime runtime;
+
+  // NOTE: `serviceManager` must be destructed after `volumeManager` since the
+  // latter holds a pointer of the former.
+  Owned<ServiceManager> serviceManager;
   Owned<VolumeManager> volumeManager;
 
   // We maintain the following invariant: if one operation depends on
@@ -494,30 +511,6 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
 
 void StorageLocalResourceProviderProcess::initialize()
 {
-  const Principal principal = LocalResourceProvider::principal(info);
-  CHECK(principal.claims.contains("cid_prefix"));
-  const string& containerPrefix = principal.claims.at("cid_prefix");
-
-  Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create(
-      extractParentEndpoint(url),
-      slave::paths::getCsiRootDir(workDir),
-      info.storage().plugin(),
-      {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
-      containerPrefix,
-      authToken,
-      &metrics);
-
-  if (volumeManager_.isError()) {
-    LOG(ERROR)
-      << "Failed to create CSI volume manager for resource provider with type '"
-      << info.type() << "' and name '" << info.name()
-      << "': " << volumeManager_.error();
-
-    fatal();
-  }
-
-  volumeManager = std::move(volumeManager_).get();
-
   auto die = [=](const string& message) {
     LOG(ERROR)
       << "Failed to recover resource provider with type '" << info.type()
@@ -547,7 +540,41 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 {
   CHECK_EQ(RECOVERING, state);
 
-  return volumeManager->recover()
+  serviceManager.reset(new ServiceManager(
+      extractParentEndpoint(url),
+      slave::paths::getCsiRootDir(workDir),
+      info.storage().plugin(),
+      {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
+      getContainerPrefix(info),
+      authToken,
+      runtime,
+      &metrics));
+
+  return serviceManager->recover()
+    .then(defer(self(), [=] {
+      return serviceManager->getApiVersion();
+    }))
+    .then(defer(self(), [=](const string& apiVersion) -> Future<Nothing> {
+      Try<Owned<VolumeManager>> volumeManager_ = VolumeManager::create(
+          slave::paths::getCsiRootDir(workDir),
+          info.storage().plugin(),
+          {csi::CONTROLLER_SERVICE, csi::NODE_SERVICE},
+          apiVersion,
+          runtime,
+          serviceManager.get(),
+          &metrics);
+
+      if (volumeManager_.isError()) {
+        return Failure(
+            "Failed to create CSI volume manager for resource provider with "
+            "type '" + info.type() + "' and name '" + info.name() + "': " +
+            volumeManager_.error());
+      }
+
+      volumeManager = std::move(volumeManager_.get());
+
+      return volumeManager->recover();
+    }))
     .then(defer(self(), [=]() -> Future<Nothing> {
       // Recover the resource provider ID and state from the latest symlink. If
       // the symlink does not exist, this is a new resource provider, and the
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index bd35150..8bf4d23 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -22,6 +22,9 @@
 #include <tuple>
 #include <vector>
 
+#include <mesos/csi/v0.hpp>
+#include <mesos/csi/v1.hpp>
+
 #include <process/clock.hpp>
 #include <process/collect.hpp>
 #include <process/future.hpp>
@@ -4795,6 +4798,11 @@ TEST_F(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
   MockCSIPlugin plugin;
   ASSERT_SOME(plugin.startup(mockCsiEndpoint));
 
+  // TODO(chhsiao): Since this test expects CSI v0 protobufs, we disable CSI v1
+  // for now. Remove this once the expectations are parameterized.
+  EXPECT_CALL(plugin, Probe(_, _, A<csi::v1::ProbeResponse*>()))
+    .WillRepeatedly(Return(grpc::Status(grpc::UNIMPLEMENTED, "")));
+
   EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
     .WillRepeatedly(Invoke([](
         grpc::ServerContext* context,


[mesos] 01/03: Fixed a container ID generation issue in the CSI service manager.

Posted by ch...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch 1.8.x
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 315340f5c287fa72d35011ba7fedabf91a66514d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
AuthorDate: Sat Apr 6 21:36:54 2019 -0700

    Fixed a container ID generation issue in the CSI service manager.
    
    Review: https://reviews.apache.org/r/70427
---
 src/csi/service_manager.cpp | 12 ++++++++++--
 1 file changed, 10 insertions(+), 2 deletions(-)

diff --git a/src/csi/service_manager.cpp b/src/csi/service_manager.cpp
index 0a3663c..d6d0d4a 100644
--- a/src/csi/service_manager.cpp
+++ b/src/csi/service_manager.cpp
@@ -23,6 +23,7 @@
 #include <vector>
 
 #include <mesos/http.hpp>
+#include <mesos/type_utils.hpp>
 
 #include <mesos/agent/agent.hpp>
 
@@ -51,7 +52,6 @@
 
 #include "csi/paths.hpp"
 #include "csi/v0_client.hpp"
-#include "csi/v0_utils.hpp"
 
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
@@ -105,11 +105,19 @@ static ContainerID getContainerId(
     const string& containerPrefix,
     const CSIPluginContainerInfo& container)
 {
+  // NOTE: We cannot simply stringify `container.services()` since it returns
+  // `RepeatedField<int>`, so we reconstruct the list of services here.
+  vector<Service> services;
+  services.reserve(container.services_size());
+  for (int i = 0; i < container.services_size(); i++) {
+    services.push_back(container.services(i));
+  }
+
   ContainerID containerId;
   containerId.set_value(
       containerPrefix +
       strings::join("-", strings::replace(info.type(), ".", "-"), info.name()) +
-      "--" + strings::join("-", container.services()));
+      "--" + strings::join("-", services));
 
   return containerId;
 }