You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2018/02/09 00:24:12 UTC

[6/7] mesos git commit: Returned profiles based on provider selectors in UriDiskProfileAdaptor.

Returned profiles based on provider selectors in UriDiskProfileAdaptor.

Now the URI disk profile adaptor module will return the set of profiles
in which each profile is either known to a storage resource provider or
applies to it (based on the resource provider selector) when it watches
for profiles.

Review: https://reviews.apache.org/r/65566/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/58add5a2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/58add5a2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/58add5a2

Branch: refs/heads/master
Commit: 58add5a2c615f0dc5620da9a125ca121c632908c
Parents: 921f61f
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Thu Feb 8 14:41:31 2018 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Thu Feb 8 16:24:02 2018 -0800

----------------------------------------------------------------------
 .../storage/uri_disk_profile.cpp                | 75 +++++++++++++-------
 .../storage/uri_disk_profile.hpp                | 35 ++++-----
 2 files changed, 68 insertions(+), 42 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/58add5a2/src/resource_provider/storage/uri_disk_profile.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/uri_disk_profile.cpp b/src/resource_provider/storage/uri_disk_profile.cpp
index 5a48656..665798f 100644
--- a/src/resource_provider/storage/uri_disk_profile.cpp
+++ b/src/resource_provider/storage/uri_disk_profile.cpp
@@ -124,7 +124,7 @@ UriDiskProfileAdaptorProcess::UriDiskProfileAdaptorProcess(
     const Flags& _flags)
   : ProcessBase(ID::generate("uri-volume-profile")),
     flags(_flags),
-    watchPromise(new Promise<hashset<string>>()) {}
+    watchPromise(new Promise<Nothing>()) {}
 
 
 void UriDiskProfileAdaptorProcess::initialize()
@@ -138,11 +138,21 @@ Future<DiskProfileAdaptor::ProfileInfo>
       const string& profile,
       const ResourceProviderInfo& resourceProviderInfo)
 {
-  if (data.count(profile) != 1) {
+  if (profileMatrix.count(profile) != 1) {
     return Failure("Profile '" + profile + "' not found");
   }
 
-  return data.at(profile);
+  const DiskProfileMapping::CSIManifest& manifest = profileMatrix.at(profile);
+
+  // TODO(chhsiao): A storage resource provider may need to translate
+  // a profile that no longer applies to it to replay a `CREATE_VOLUME`
+  // or `CREATE_BLOCK` operation during recovery, so resource provider
+  // selection is only done in `watch()` but not here. We should do the
+  // selection once profiles are checkpointed in the resource provider.
+  return DiskProfileAdaptor::ProfileInfo{
+    manifest.volume_capabilities(),
+    manifest.create_parameters()
+  };
 }
 
 
@@ -150,11 +160,31 @@ Future<hashset<string>> UriDiskProfileAdaptorProcess::watch(
     const hashset<string>& knownProfiles,
     const ResourceProviderInfo& resourceProviderInfo)
 {
-  if (profiles != knownProfiles) {
-    return profiles;
+  // Calculate the new set of profiles for the resource provider.
+  // TODO(chhsiao): A storage resource provider assumes that the new set
+  // should be a superset of `knownProfiles`, so we bypass resource
+  // provider selection if a profile is already known. We should do the
+  // selection once profiles are checkpointed in the resource provider.
+  hashset<string> newProfiles = knownProfiles;
+  foreachpair (const string& profile,
+               const DiskProfileMapping::CSIManifest& manifest,
+               profileMatrix) {
+    if (knownProfiles.contains(profile)) {
+      continue;
+    }
+
+    if (isSelectedResourceProvider(manifest, resourceProviderInfo)) {
+      newProfiles.insert(profile);
+    }
   }
 
-  return watchPromise->future();
+  if (newProfiles != knownProfiles) {
+    return newProfiles;
+  }
+
+  // Wait for the next update if there is no change.
+  return watchPromise->future()
+    .then(defer(self(), &Self::watch, knownProfiles, resourceProviderInfo));
 }
 
 
@@ -212,7 +242,9 @@ void UriDiskProfileAdaptorProcess::notify(
 {
   bool hasErrors = false;
 
-  foreachkey (const string& profile, data) {
+  foreachpair (const string& profile,
+               const DiskProfileMapping::CSIManifest& manifest,
+               profileMatrix) {
     if (parsed.profile_matrix().count(profile) != 1) {
       hasErrors = true;
 
@@ -223,11 +255,11 @@ void UriDiskProfileAdaptorProcess::notify(
     }
 
     bool matchingCapability =
-      data.at(profile).capability ==
+      manifest.volume_capabilities() ==
         parsed.profile_matrix().at(profile).volume_capabilities();
 
     bool matchingParameters =
-      data.at(profile).parameters ==
+      manifest.create_parameters() ==
         parsed.profile_matrix().at(profile).create_parameters();
 
     if (!matchingCapability || !matchingParameters) {
@@ -249,35 +281,26 @@ void UriDiskProfileAdaptorProcess::notify(
 
   // Profiles can only be added, so if the parsed data is the same size,
   // nothing has changed and no notifications need to be sent.
-  if (parsed.profile_matrix().size() <= data.size()) {
+  if (parsed.profile_matrix().size() <= profileMatrix.size()) {
     return;
   }
 
   // The fetched mapping satisfies our invariants.
 
-  // Save the protobuf as a map we can expose through the module interface.
-  // And update the convenience set of profile names.
-  profiles.clear();
-  auto iterator = parsed.profile_matrix().begin();
-  while (iterator != parsed.profile_matrix().end()) {
-    data[iterator->first] = {
-      iterator->second.volume_capabilities(),
-      iterator->second.create_parameters()
-    };
-
-    profiles.insert(iterator->first);
-    iterator++;
-  }
+  // Save the protobuf as a map.
+  profileMatrix = map<string, DiskProfileMapping::CSIManifest>(
+      parsed.profile_matrix().begin(),
+      parsed.profile_matrix().end());
 
   // Notify any watchers and then prepare a new promise for the next
   // iteration of polling.
   //
   // TODO(josephw): Delay this based on the `--max_random_wait` option.
-  watchPromise->set(profiles);
-  watchPromise.reset(new Promise<hashset<string>>());
+  watchPromise->set(Nothing());
+  watchPromise.reset(new Promise<Nothing>());
 
   LOG(INFO)
-    << "Updated disk profile mapping to " << profiles.size()
+    << "Updated disk profile mapping to " << profileMatrix.size()
     << " total profiles";
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/58add5a2/src/resource_provider/storage/uri_disk_profile.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/uri_disk_profile.hpp b/src/resource_provider/storage/uri_disk_profile.hpp
index 746111e..22e9d8b 100644
--- a/src/resource_provider/storage/uri_disk_profile.hpp
+++ b/src/resource_provider/storage/uri_disk_profile.hpp
@@ -58,17 +58,23 @@ struct Flags : public virtual flags::FlagsBase
         "This module supports both HTTP(s) and file URIs\n."
         "\n"
         "The JSON object should consist of some top-level string keys\n"
-        "corresponding to the disk profile name. Each value should\n"
-        "contain a `VolumeCapability` under a 'volume_capabilities'\n"
+        "corresponding to the disk profile name. Each value should contain\n"
+        "a `ResourceProviderSelector` under 'resource_provider_selector' or\n"
+        "a `CSIPluginTypeSelector` under 'csi_plugin_type_selector' to\n"
+        "specify the set of resource providers this profile applies to,\n"
+        "followed by a `VolumeCapability` under 'volume_capabilities'\n"
         "and a free-form string-string mapping under 'create_parameters'.\n"
         "\n"
         "The JSON is modeled after a protobuf found in\n"
-        "`src/csi/uri_disk_profile.proto`.\n"
+        "`src/resource_provider/storage/disk_profile.proto`.\n"
         "\n"
         "For example:\n"
         "{\n"
         "  \"profile_matrix\" : {\n"
         "    \"my-profile\" : {\n"
+        "      \"csi_plugin_type_selector\": {\n"
+        "        \"plugin_type\" : \"org.apache.mesos.csi.test\"\n"
+        "      \"},\n"
         "      \"volume_capabilities\" : {\n"
         "        \"block\" : {},\n"
         "        \"access_mode\" : { \"mode\" : \"SINGLE_NODE_WRITER\" }\n"
@@ -171,14 +177,14 @@ struct Flags : public virtual flags::FlagsBase
 // and assumes that all fetched profiles are meant for all resource providers.
 //
 // See `Flags` above for more information.
-class UriDiskProfileAdaptor : public mesos::DiskProfileAdaptor
+class UriDiskProfileAdaptor : public DiskProfileAdaptor
 {
 public:
   UriDiskProfileAdaptor(const Flags& _flags);
 
   virtual ~UriDiskProfileAdaptor();
 
-  virtual process::Future<mesos::DiskProfileAdaptor::ProfileInfo> translate(
+  virtual process::Future<DiskProfileAdaptor::ProfileInfo> translate(
       const std::string& profile,
       const ResourceProviderInfo& resourceProviderInfo) override;
 
@@ -200,7 +206,7 @@ public:
 
   virtual void initialize() override;
 
-  process::Future<mesos::DiskProfileAdaptor::ProfileInfo> translate(
+  process::Future<DiskProfileAdaptor::ProfileInfo> translate(
       const std::string& profile,
       const ResourceProviderInfo& resourceProviderInfo);
 
@@ -226,19 +232,16 @@ private:
   Flags flags;
 
   // The last fetched profile mapping.
-  // This module assumes that profiles can only be added and never removed.
-  // Once added, profiles cannot be changed either.
+  // This module assumes that profiles can only be added and never
+  // removed. Once added, a profile's volume capability and parameters
+  // cannot be changed either.
   //
   // TODO(josephw): Consider persisting this mapping across agent restarts.
-  std::map<std::string, DiskProfileAdaptor::ProfileInfo> data;
+  std::map<std::string, resource_provider::DiskProfileMapping::CSIManifest>
+    profileMatrix;
 
-  // Convenience set of the keys in `data` above.
-  // This module does not filter based on `CSIPluginInfo::type`, so this
-  // is valid for all input to `watch(...)`.
-  hashset<std::string> profiles;
-
-  // Will be satisfied whenever `data` is changed.
-  process::Owned<process::Promise<hashset<std::string>>> watchPromise;
+  // Will be satisfied whenever `profileMatrix` is changed.
+  process::Owned<process::Promise<Nothing>> watchPromise;
 };
 
 } // namespace profile {