You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:05 UTC

[mesos] 08/15: Cleanup volume and storage pool listing.

This is an automated email from the ASF dual-hosted git repository.

chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 0fddc51b2e7aebdd2c7409d06bba4870bf1c14a4
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:53 2019 -0700

    Cleanup volume and storage pool listing.
    
    This patch modified SLRP's volume and storage pool listing methods to
    conform to `VolumeManager`s public interface. They will be moved out
    from SLRP to v0 `VolumeManager` later.
    
    Review: https://reviews.apache.org/r/70284/
---
 src/resource_provider/storage/provider.cpp         | 156 ++++++++++++---------
 src/resource_provider/storage/provider_process.hpp |   9 +-
 2 files changed, 95 insertions(+), 70 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 5a63e7b..c5a5213 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -779,7 +779,7 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return reconcileOperationStatuses()
     .then(defer(self(), [=] {
-      return collect(vector<Future<Resources>>{listVolumes(), getCapacities()})
+      return collect<Resources>({getRawVolumes(), getStoragePools()})
         .then(defer(self(), [=](const vector<Resources>& discovered) {
           ResourceConversion conversion = reconcileResources(
               totalResources,
@@ -986,7 +986,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
     fatal();
   };
 
-  return getCapacities()
+  return getStoragePools()
     .then(defer(self(), [=](const Resources& discovered) {
       ResourceConversion conversion = reconcileResources(
           totalResources.filter(
@@ -1109,6 +1109,71 @@ ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
 }
 
 
+Future<Resources> StorageLocalResourceProviderProcess::getRawVolumes()
+{
+  CHECK(info.has_id());
+
+  return listVolumes()
+    .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
+      Resources resources;
+
+      // Recover disk profiles from the checkpointed state.
+      hashmap<string, string> volumesToProfiles;
+      foreach (const Resource& resource, totalResources) {
+        if (resource.disk().source().has_id() &&
+            resource.disk().source().has_profile()) {
+          volumesToProfiles.put(
+              resource.disk().source().id(),
+              resource.disk().source().profile());
+        }
+      }
+
+      foreach (const VolumeInfo& volumeInfo, volumeInfos) {
+        resources += createRawDiskResource(
+            info,
+            volumeInfo.capacity,
+            volumesToProfiles.contains(volumeInfo.id)
+              ? volumesToProfiles.at(volumeInfo.id)
+              : Option<string>::none(),
+            vendor,
+            volumeInfo.id,
+            volumeInfo.context.empty()
+              ? Option<Labels>::none()
+              : convertStringMapToLabels(volumeInfo.context));
+      }
+
+      return resources;
+    }));
+}
+
+
+Future<Resources> StorageLocalResourceProviderProcess::getStoragePools()
+{
+  CHECK(info.has_id());
+
+  vector<Future<Resources>> futures;
+
+  foreachpair (const string& profile,
+               const DiskProfileAdaptor::ProfileInfo& profileInfo,
+               profileInfos) {
+    futures.push_back(
+        getCapacity(profileInfo.capability, profileInfo.parameters)
+          .then(defer(self(), [=](const Bytes& capacity) -> Resources {
+            if (capacity == 0) {
+              return Resources();
+            }
+
+            return createRawDiskResource(info, capacity, profile, vendor);
+          })));
+  }
+
+  return collect(futures)
+    .then([](const vector<Resources>& resources) {
+      return accumulate(resources.begin(), resources.end(), Resources());
+    });
+}
+
+
 void StorageLocalResourceProviderProcess::watchProfiles()
 {
   auto err = [](const string& message) {
@@ -2180,88 +2245,43 @@ Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume(
 }
 
 
-Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
+Future<vector<VolumeInfo>> StorageLocalResourceProviderProcess::listVolumes()
 {
-  CHECK(info.has_id());
-
-  // This is only used for reconciliation so no failure is returned.
   if (!controllerCapabilities->listVolumes) {
-    return Resources();
+    return vector<VolumeInfo>();
   }
 
-  // TODO(chhsiao): Set the max entries and use a loop to do
-  // multiple `ListVolumes` calls.
-  return call<csi::v0::LIST_VOLUMES>(
-      csi::CONTROLLER_SERVICE, csi::v0::ListVolumesRequest())
-    .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
-      Resources resources;
-
-      // Recover disk profiles from the checkpointed state.
-      hashmap<string, string> volumesToProfiles;
-      foreach (const Resource& resource, totalResources) {
-        if (resource.disk().source().has_id() &&
-            resource.disk().source().has_profile()) {
-          volumesToProfiles.put(
-              resource.disk().source().id(),
-              resource.disk().source().profile());
-        }
-      }
-
+  // TODO(chhsiao): Set the max entries and use a loop to do multiple
+  // `ListVolumes` calls.
+  return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest())
+    .then(process::defer(self(), [](const ListVolumesResponse& response) {
+      vector<VolumeInfo> result;
       foreach (const auto& entry, response.entries()) {
-        resources += createRawDiskResource(
-            info,
-            Bytes(entry.volume().capacity_bytes()),
-            volumesToProfiles.contains(entry.volume().id())
-              ? volumesToProfiles.at(entry.volume().id())
-              : Option<string>::none(),
-            vendor,
-            entry.volume().id(),
-            entry.volume().attributes().empty()
-              ? Option<Labels>::none()
-              : convertStringMapToLabels(entry.volume().attributes()));
+        result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()),
+                                    entry.volume().id(),
+                                    entry.volume().attributes()});
       }
 
-      return resources;
+      return result;
     }));
 }
 
 
-Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
+Future<Bytes> StorageLocalResourceProviderProcess::getCapacity(
+    const types::VolumeCapability& capability,
+    const Map<string, string>& parameters)
 {
-  CHECK(info.has_id());
-
-  // This is only used for reconciliation so no failure is returned.
   if (!controllerCapabilities->getCapacity) {
-    return Resources();
+    return Bytes(0);
   }
 
-  vector<Future<Resources>> futures;
-
-  foreachpair (const string& profile,
-               const DiskProfileAdaptor::ProfileInfo& profileInfo,
-               profileInfos) {
-    csi::v0::GetCapacityRequest request;
-    *request.add_volume_capabilities() =
-      csi::v0::evolve(profileInfo.capability);
-    *request.mutable_parameters() = profileInfo.parameters;
-
-    futures.push_back(
-        call<csi::v0::GET_CAPACITY>(
-            csi::CONTROLLER_SERVICE, std::move(request))
-          .then(defer(self(), [=](
-              const csi::v0::GetCapacityResponse& response) -> Resources {
-            if (response.available_capacity() == 0) {
-              return Resources();
-            }
-
-            return createRawDiskResource(
-                info, Bytes(response.available_capacity()), profile, vendor);
-          })));
-  }
+  GetCapacityRequest request;
+  *request.add_volume_capabilities() = csi::v0::evolve(capability);
+  *request.mutable_parameters() = parameters;
 
-  return collect(futures)
-    .then([](const vector<Resources>& resources) {
-      return accumulate(resources.begin(), resources.end(), Resources());
+  return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request))
+    .then([](const GetCapacityResponse& response) {
+      return Bytes(response.available_capacity());
     });
 }
 
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
index b298a8e..56d3682 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -160,6 +160,9 @@ private:
       const Resources& checkpointed,
       const Resources& discovered);
 
+  process::Future<Resources> getRawVolumes();
+  process::Future<Resources> getStoragePools();
+
   // Spawns a loop to watch for changes in the set of known profiles and update
   // the profile mapping and storage pools accordingly.
   void watchProfiles();
@@ -249,10 +252,12 @@ private:
       const google::protobuf::Map<std::string, std::string>& parameters);
 
   // NOTE: This can only be called after `prepareServices`.
-  process::Future<Resources> listVolumes();
+  process::Future<std::vector<csi::VolumeInfo>> listVolumes();
 
   // NOTE: This can only be called after `prepareServices`.
-  process::Future<Resources> getCapacities();
+  process::Future<Bytes> getCapacity(
+      const csi::types::VolumeCapability& capability,
+      const google::protobuf::Map<std::string, std::string>& parameters);
 
   // Applies the operation. Speculative operations will be synchronously
   // applied. Do nothing if the operation is already in a terminal state.