You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/08/20 10:02:36 UTC
[mesos] 06/07: Factored out storage provider method to update
resources.
This is an automated email from the ASF dual-hosted git repository.
bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
commit 266616aa6fe0027b07af80157bf7293735609d54
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:37 2019 +0200
Factored out storage provider method to update resources.
Review: https://reviews.apache.org/r/71150/
---
src/resource_provider/storage/provider.cpp | 120 ++++++++++-----------
.../storage_local_resource_provider_tests.cpp | 11 +-
2 files changed, 64 insertions(+), 67 deletions(-)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 71aa650..2f91fe0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -273,6 +273,10 @@ private:
// truth, such as CSI plugin responses or the status update manager.
Future<Nothing> reconcileResourceProviderState();
Future<Nothing> reconcileOperationStatuses();
+
+ // Query the plugin for its resources and update the providers state.
+ Future<Nothing> reconcileResources();
+
ResourceConversion computeConversion(
const Resources& checkpointed, const Resources& discovered) const;
@@ -295,10 +299,6 @@ private:
// set of profiles it knows about.
Future<Nothing> updateProfiles(const hashset<string>& profiles);
- // Reconcile the storage pools when the set of known profiles changes,
- // or a volume with an unknown profile is destroyed.
- Future<Nothing> reconcileStoragePools();
-
// Returns true if the storage pools are allowed to be reconciled when
// the operation is being applied.
static bool allowsReconciliation(const Offer::Operation& operation);
@@ -716,39 +716,63 @@ Future<Nothing>
StorageLocalResourceProviderProcess::reconcileResourceProviderState()
{
return reconcileOperationStatuses()
- .then(defer(self(), [=] {
- return collect<vector<ResourceConversion>>(
- {getExistingVolumes(), getStoragePools()})
- .then(defer(self(), [=](
- const vector<vector<ResourceConversion>>& collected) {
+ .then(defer(self(), &Self::reconcileResources));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
+{
+ LOG(INFO) << "Reconciling storage pools and volumes";
+
+ return collect<vector<ResourceConversion>>(
+ {getExistingVolumes(), getStoragePools()})
+ .then(defer(
+ self(), [this](const vector<vector<ResourceConversion>>& collected) {
Resources result = totalResources;
foreach (const vector<ResourceConversion>& conversions, collected) {
result = CHECK_NOTERROR(result.apply(conversions));
}
+ bool shouldSendUpdate = false;
+
if (result != totalResources) {
- LOG(INFO)
- << "Removing '" << (totalResources - result) << "' and adding '"
- << (result - totalResources) << "' to the total resources";
+ LOG(INFO) << "Removing '" << (totalResources - result)
+ << "' and adding '" << (result - totalResources)
+ << "' to the total resources";
+ // Update the resource version since the total resources changed.
totalResources = result;
+ resourceVersion = id::UUID::random();
+
checkpointResourceProviderState();
+
+ shouldSendUpdate = true;
}
- // NOTE: Since this is the first `UPDATE_STATE` call of the
- // current subscription, there must be no racing speculative
- // operation, thus no need to update the resource version.
- sendResourceProviderStateUpdate();
- statusUpdateManager.resume();
+ switch (state) {
+ case RECOVERING:
+ case DISCONNECTED:
+ case CONNECTED:
+ case SUBSCRIBED: {
+ LOG(INFO) << "Resource provider " << info.id()
+ << " is in READY state";
- LOG(INFO)
- << "Resource provider " << info.id() << " is in READY state";
+ state = READY;
+
+ // This is the first resource update of the current subscription.
+ shouldSendUpdate = true;
+ }
+ case READY:
+ break;
+ }
- state = READY;
+ if (shouldSendUpdate) {
+ sendResourceProviderStateUpdate();
+ statusUpdateManager.resume();
+ }
return Nothing();
}));
- }));
}
@@ -913,44 +937,6 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
}
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
-{
- CHECK_PENDING(reconciled);
-
- auto die = [=](const string& message) {
- LOG(ERROR)
- << "Failed to reconcile storage pools for resource provider " << info.id()
- << ": " << message;
- fatal();
- };
-
- return getStoragePools()
- .then(defer(self(), [=](const vector<ResourceConversion>& conversions) {
- Resources result = CHECK_NOTERROR(totalResources.apply(conversions));
-
- if (result != totalResources) {
- LOG(INFO)
- << "Removing '" << (totalResources - result) << "' and adding '"
- << (result - totalResources) << "' to the total resources";
-
- totalResources = result;
- checkpointResourceProviderState();
-
- // NOTE: We always update the resource version before sending
- // an `UPDATE_STATE`, so that any racing speculative operation
- // will be rejected. Otherwise, the speculative resource
- // conversion done on the master will be cancelled out.
- resourceVersion = id::UUID::random();
- sendResourceProviderStateUpdate();
- }
-
- return Nothing();
- }))
- .onFailed(defer(self(), std::bind(die, lambda::_1)))
- .onDiscarded(defer(self(), std::bind(die, "future discarded")));
-}
-
-
bool StorageLocalResourceProviderProcess::allowsReconciliation(
const Offer::Operation& operation)
{
@@ -1182,7 +1168,7 @@ void StorageLocalResourceProviderProcess::watchProfiles()
std::function<Future<Nothing>()> update = defer(self(), [=] {
return updateProfiles(profiles)
- .then(defer(self(), &Self::reconcileStoragePools));
+ .then(defer(self(), &Self::reconcileResources));
});
// Update the profile mapping and storage pools in `sequence` to wait
@@ -1871,8 +1857,18 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
// pending operation that disallow reconciliation to finish, and set
// up `reconciled` to drop incoming operations that disallow
// reconciliation until the storage pools are reconciled.
- reconciled = sequence.add(std::function<Future<Nothing>()>(
- defer(self(), &Self::reconcileStoragePools)));
+ auto err = [](const Resource& resource, const string& message) {
+ LOG(ERROR)
+ << "Failed to reconcile storage pools after resource "
+ << "'" << resource << "' has been freed: " << message;
+ };
+
+ reconciled =
+ sequence
+ .add(std::function<Future<Nothing>()>(
+ defer(self(), &Self::reconcileResources)))
+ .onFailed(std::bind(err, resource, lambda::_1))
+ .onDiscard(std::bind(err, resource, "future discarded"));
}
}
} else {
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index f27f5c1..7624ea1 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -5129,12 +5129,13 @@ TEST_P(StorageLocalResourceProviderTest, CsiPluginRpcMetrics)
ASSERT_SOME(source);
// We expect that the following RPC calls are made during startup: `Probe`,
- // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`,
- // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
+ // `GetPluginInfo` (2), `GetCapacity`, `GetPluginCapabilities,
+ // `ControllerGetCapabilities`, `ListVolumes` (2), `NodeGetCapabilities`,
+ // `NodeGetId`.
//
// TODO(chhsiao): As these are implementation details, we should count the
// calls processed by a mock CSI plugin and check the metrics against that.
- const int numFinishedStartupRpcs = 9;
+ const int numFinishedStartupRpcs = 10;
EXPECT_TRUE(metricEquals(
metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs));
@@ -5870,11 +5871,11 @@ TEST_P(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
// We expect that the following RPC calls are made during startup: `Probe`,
// `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`,
- // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
+ // `ListVolumes` (2), `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
//
// TODO(chhsiao): As these are implementation details, we should count the
// calls processed by a mock CSI plugin and check the metrics against that.
- const int numFinishedStartupRpcs = 9;
+ const int numFinishedStartupRpcs = 10;
EXPECT_TRUE(metricEquals(
metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs));