You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2019/04/03 23:14:05 UTC
[mesos] 08/15: Cleanup volume and storage pool listing.
This is an automated email from the ASF dual-hosted git repository.
chhsiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 0fddc51b2e7aebdd2c7409d06bba4870bf1c14a4
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Mon Apr 1 23:23:53 2019 -0700
Cleanup volume and storage pool listing.
This patch modified SLRP's volume and storage pool listing methods to
conform to `VolumeManager`s public interface. They will be moved out
from SLRP to v0 `VolumeManager` later.
Review: https://reviews.apache.org/r/70284/
---
src/resource_provider/storage/provider.cpp | 156 ++++++++++++---------
src/resource_provider/storage/provider_process.hpp | 9 +-
2 files changed, 95 insertions(+), 70 deletions(-)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 5a63e7b..c5a5213 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -779,7 +779,7 @@ StorageLocalResourceProviderProcess::reconcileResourceProviderState()
{
return reconcileOperationStatuses()
.then(defer(self(), [=] {
- return collect(vector<Future<Resources>>{listVolumes(), getCapacities()})
+ return collect<Resources>({getRawVolumes(), getStoragePools()})
.then(defer(self(), [=](const vector<Resources>& discovered) {
ResourceConversion conversion = reconcileResources(
totalResources,
@@ -986,7 +986,7 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
fatal();
};
- return getCapacities()
+ return getStoragePools()
.then(defer(self(), [=](const Resources& discovered) {
ResourceConversion conversion = reconcileResources(
totalResources.filter(
@@ -1109,6 +1109,71 @@ ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
}
+Future<Resources> StorageLocalResourceProviderProcess::getRawVolumes()
+{
+ CHECK(info.has_id());
+
+ return listVolumes()
+ .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
+ Resources resources;
+
+ // Recover disk profiles from the checkpointed state.
+ hashmap<string, string> volumesToProfiles;
+ foreach (const Resource& resource, totalResources) {
+ if (resource.disk().source().has_id() &&
+ resource.disk().source().has_profile()) {
+ volumesToProfiles.put(
+ resource.disk().source().id(),
+ resource.disk().source().profile());
+ }
+ }
+
+ foreach (const VolumeInfo& volumeInfo, volumeInfos) {
+ resources += createRawDiskResource(
+ info,
+ volumeInfo.capacity,
+ volumesToProfiles.contains(volumeInfo.id)
+ ? volumesToProfiles.at(volumeInfo.id)
+ : Option<string>::none(),
+ vendor,
+ volumeInfo.id,
+ volumeInfo.context.empty()
+ ? Option<Labels>::none()
+ : convertStringMapToLabels(volumeInfo.context));
+ }
+
+ return resources;
+ }));
+}
+
+
+Future<Resources> StorageLocalResourceProviderProcess::getStoragePools()
+{
+ CHECK(info.has_id());
+
+ vector<Future<Resources>> futures;
+
+ foreachpair (const string& profile,
+ const DiskProfileAdaptor::ProfileInfo& profileInfo,
+ profileInfos) {
+ futures.push_back(
+ getCapacity(profileInfo.capability, profileInfo.parameters)
+ .then(defer(self(), [=](const Bytes& capacity) -> Resources {
+ if (capacity == 0) {
+ return Resources();
+ }
+
+ return createRawDiskResource(info, capacity, profile, vendor);
+ })));
+ }
+
+ return collect(futures)
+ .then([](const vector<Resources>& resources) {
+ return accumulate(resources.begin(), resources.end(), Resources());
+ });
+}
+
+
void StorageLocalResourceProviderProcess::watchProfiles()
{
auto err = [](const string& message) {
@@ -2180,88 +2245,43 @@ Future<Option<Error>> StorageLocalResourceProviderProcess::validateVolume(
}
-Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
+Future<vector<VolumeInfo>> StorageLocalResourceProviderProcess::listVolumes()
{
- CHECK(info.has_id());
-
- // This is only used for reconciliation so no failure is returned.
if (!controllerCapabilities->listVolumes) {
- return Resources();
+ return vector<VolumeInfo>();
}
- // TODO(chhsiao): Set the max entries and use a loop to do
- // multiple `ListVolumes` calls.
- return call<csi::v0::LIST_VOLUMES>(
- csi::CONTROLLER_SERVICE, csi::v0::ListVolumesRequest())
- .then(defer(self(), [=](const csi::v0::ListVolumesResponse& response) {
- Resources resources;
-
- // Recover disk profiles from the checkpointed state.
- hashmap<string, string> volumesToProfiles;
- foreach (const Resource& resource, totalResources) {
- if (resource.disk().source().has_id() &&
- resource.disk().source().has_profile()) {
- volumesToProfiles.put(
- resource.disk().source().id(),
- resource.disk().source().profile());
- }
- }
-
+ // TODO(chhsiao): Set the max entries and use a loop to do multiple
+ // `ListVolumes` calls.
+ return call<LIST_VOLUMES>(CONTROLLER_SERVICE, ListVolumesRequest())
+ .then(process::defer(self(), [](const ListVolumesResponse& response) {
+ vector<VolumeInfo> result;
foreach (const auto& entry, response.entries()) {
- resources += createRawDiskResource(
- info,
- Bytes(entry.volume().capacity_bytes()),
- volumesToProfiles.contains(entry.volume().id())
- ? volumesToProfiles.at(entry.volume().id())
- : Option<string>::none(),
- vendor,
- entry.volume().id(),
- entry.volume().attributes().empty()
- ? Option<Labels>::none()
- : convertStringMapToLabels(entry.volume().attributes()));
+ result.push_back(VolumeInfo{Bytes(entry.volume().capacity_bytes()),
+ entry.volume().id(),
+ entry.volume().attributes()});
}
- return resources;
+ return result;
}));
}
-Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
+Future<Bytes> StorageLocalResourceProviderProcess::getCapacity(
+ const types::VolumeCapability& capability,
+ const Map<string, string>& parameters)
{
- CHECK(info.has_id());
-
- // This is only used for reconciliation so no failure is returned.
if (!controllerCapabilities->getCapacity) {
- return Resources();
+ return Bytes(0);
}
- vector<Future<Resources>> futures;
-
- foreachpair (const string& profile,
- const DiskProfileAdaptor::ProfileInfo& profileInfo,
- profileInfos) {
- csi::v0::GetCapacityRequest request;
- *request.add_volume_capabilities() =
- csi::v0::evolve(profileInfo.capability);
- *request.mutable_parameters() = profileInfo.parameters;
-
- futures.push_back(
- call<csi::v0::GET_CAPACITY>(
- csi::CONTROLLER_SERVICE, std::move(request))
- .then(defer(self(), [=](
- const csi::v0::GetCapacityResponse& response) -> Resources {
- if (response.available_capacity() == 0) {
- return Resources();
- }
-
- return createRawDiskResource(
- info, Bytes(response.available_capacity()), profile, vendor);
- })));
- }
+ GetCapacityRequest request;
+ *request.add_volume_capabilities() = csi::v0::evolve(capability);
+ *request.mutable_parameters() = parameters;
- return collect(futures)
- .then([](const vector<Resources>& resources) {
- return accumulate(resources.begin(), resources.end(), Resources());
+ return call<GET_CAPACITY>(CONTROLLER_SERVICE, std::move(request))
+ .then([](const GetCapacityResponse& response) {
+ return Bytes(response.available_capacity());
});
}
diff --git a/src/resource_provider/storage/provider_process.hpp b/src/resource_provider/storage/provider_process.hpp
index b298a8e..56d3682 100644
--- a/src/resource_provider/storage/provider_process.hpp
+++ b/src/resource_provider/storage/provider_process.hpp
@@ -160,6 +160,9 @@ private:
const Resources& checkpointed,
const Resources& discovered);
+ process::Future<Resources> getRawVolumes();
+ process::Future<Resources> getStoragePools();
+
// Spawns a loop to watch for changes in the set of known profiles and update
// the profile mapping and storage pools accordingly.
void watchProfiles();
@@ -249,10 +252,12 @@ private:
const google::protobuf::Map<std::string, std::string>& parameters);
// NOTE: This can only be called after `prepareServices`.
- process::Future<Resources> listVolumes();
+ process::Future<std::vector<csi::VolumeInfo>> listVolumes();
// NOTE: This can only be called after `prepareServices`.
- process::Future<Resources> getCapacities();
+ process::Future<Bytes> getCapacity(
+ const csi::types::VolumeCapability& capability,
+ const google::protobuf::Map<std::string, std::string>& parameters);
// Applies the operation. Speculative operations will be synchronously
// applied. Do nothing if the operation is already in a terminal state.