You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/08/20 10:02:36 UTC

[mesos] 06/07: Factored out storage provider method to update resources.

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 266616aa6fe0027b07af80157bf7293735609d54
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:37 2019 +0200

    Factored out storage provider method to update resources.
    
    Review: https://reviews.apache.org/r/71150/
---
 src/resource_provider/storage/provider.cpp         | 120 ++++++++++-----------
 .../storage_local_resource_provider_tests.cpp      |  11 +-
 2 files changed, 64 insertions(+), 67 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 71aa650..2f91fe0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -273,6 +273,10 @@ private:
   // truth, such as CSI plugin responses or the status update manager.
   Future<Nothing> reconcileResourceProviderState();
   Future<Nothing> reconcileOperationStatuses();
+
+  // Query the plugin for its resources and update the providers state.
+  Future<Nothing> reconcileResources();
+
   ResourceConversion computeConversion(
       const Resources& checkpointed, const Resources& discovered) const;
 
@@ -295,10 +299,6 @@ private:
   // set of profiles it knows about.
   Future<Nothing> updateProfiles(const hashset<string>& profiles);
 
-  // Reconcile the storage pools when the set of known profiles changes,
-  // or a volume with an unknown profile is destroyed.
-  Future<Nothing> reconcileStoragePools();
-
   // Returns true if the storage pools are allowed to be reconciled when
   // the operation is being applied.
   static bool allowsReconciliation(const Offer::Operation& operation);
@@ -716,39 +716,63 @@ Future<Nothing>
 StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return reconcileOperationStatuses()
-    .then(defer(self(), [=] {
-      return collect<vector<ResourceConversion>>(
-          {getExistingVolumes(), getStoragePools()})
-        .then(defer(self(), [=](
-            const vector<vector<ResourceConversion>>& collected) {
+    .then(defer(self(), &Self::reconcileResources));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
+{
+  LOG(INFO) << "Reconciling storage pools and volumes";
+
+  return collect<vector<ResourceConversion>>(
+             {getExistingVolumes(), getStoragePools()})
+    .then(defer(
+        self(), [this](const vector<vector<ResourceConversion>>& collected) {
           Resources result = totalResources;
           foreach (const vector<ResourceConversion>& conversions, collected) {
             result = CHECK_NOTERROR(result.apply(conversions));
           }
 
+          bool shouldSendUpdate = false;
+
           if (result != totalResources) {
-            LOG(INFO)
-              << "Removing '" << (totalResources - result) << "' and adding '"
-              << (result - totalResources) << "' to the total resources";
+            LOG(INFO) << "Removing '" << (totalResources - result)
+                      << "' and adding '" << (result - totalResources)
+                      << "' to the total resources";
 
+            // Update the resource version since the total resources changed.
             totalResources = result;
+            resourceVersion = id::UUID::random();
+
             checkpointResourceProviderState();
+
+            shouldSendUpdate = true;
           }
 
-          // NOTE: Since this is the first `UPDATE_STATE` call of the
-          // current subscription, there must be no racing speculative
-          // operation, thus no need to update the resource version.
-          sendResourceProviderStateUpdate();
-          statusUpdateManager.resume();
+          switch (state) {
+            case RECOVERING:
+            case DISCONNECTED:
+            case CONNECTED:
+            case SUBSCRIBED: {
+              LOG(INFO) << "Resource provider " << info.id()
+                        << " is in READY state";
 
-          LOG(INFO)
-            << "Resource provider " << info.id() << " is in READY state";
+              state = READY;
+
+              // This is the first resource update of the current subscription.
+              shouldSendUpdate = true;
+            }
+            case READY:
+              break;
+          }
 
-          state = READY;
+          if (shouldSendUpdate) {
+            sendResourceProviderStateUpdate();
+            statusUpdateManager.resume();
+          }
 
           return Nothing();
         }));
-    }));
 }
 
 
@@ -913,44 +937,6 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
-{
-  CHECK_PENDING(reconciled);
-
-  auto die = [=](const string& message) {
-    LOG(ERROR)
-      << "Failed to reconcile storage pools for resource provider " << info.id()
-      << ": " << message;
-    fatal();
-  };
-
-  return getStoragePools()
-    .then(defer(self(), [=](const vector<ResourceConversion>& conversions) {
-      Resources result = CHECK_NOTERROR(totalResources.apply(conversions));
-
-      if (result != totalResources) {
-        LOG(INFO)
-          << "Removing '" << (totalResources - result) << "' and adding '"
-          << (result - totalResources) << "' to the total resources";
-
-        totalResources = result;
-        checkpointResourceProviderState();
-
-        // NOTE: We always update the resource version before sending
-        // an `UPDATE_STATE`, so that any racing speculative operation
-        // will be rejected. Otherwise, the speculative resource
-        // conversion done on the master will be cancelled out.
-        resourceVersion = id::UUID::random();
-        sendResourceProviderStateUpdate();
-      }
-
-      return Nothing();
-    }))
-    .onFailed(defer(self(), std::bind(die, lambda::_1)))
-    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
-}
-
-
 bool StorageLocalResourceProviderProcess::allowsReconciliation(
     const Offer::Operation& operation)
 {
@@ -1182,7 +1168,7 @@ void StorageLocalResourceProviderProcess::watchProfiles()
 
         std::function<Future<Nothing>()> update = defer(self(), [=] {
           return updateProfiles(profiles)
-            .then(defer(self(), &Self::reconcileStoragePools));
+            .then(defer(self(), &Self::reconcileResources));
         });
 
         // Update the profile mapping and storage pools in `sequence` to wait
@@ -1871,8 +1857,18 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
             // pending operation that disallow reconciliation to finish, and set
             // up `reconciled` to drop incoming operations that disallow
             // reconciliation until the storage pools are reconciled.
-            reconciled = sequence.add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::reconcileStoragePools)));
+            auto err = [](const Resource& resource, const string& message) {
+              LOG(ERROR)
+                << "Failed to reconcile storage pools after resource "
+                << "'" << resource << "' has been freed: " << message;
+            };
+
+            reconciled =
+              sequence
+                .add(std::function<Future<Nothing>()>(
+                    defer(self(), &Self::reconcileResources)))
+                .onFailed(std::bind(err, resource, lambda::_1))
+                .onDiscard(std::bind(err, resource, "future discarded"));
           }
         }
       } else {
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index f27f5c1..7624ea1 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -5129,12 +5129,13 @@ TEST_P(StorageLocalResourceProviderTest, CsiPluginRpcMetrics)
   ASSERT_SOME(source);
 
   // We expect that the following RPC calls are made during startup: `Probe`,
-  // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`,
-  // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
+  // `GetPluginInfo` (2), `GetCapacity`, `GetPluginCapabilities,
+  // `ControllerGetCapabilities`, `ListVolumes` (2), `NodeGetCapabilities`,
+  // `NodeGetId`.
   //
   // TODO(chhsiao): As these are implementation details, we should count the
   // calls processed by a mock CSI plugin and check the metrics against that.
-  const int numFinishedStartupRpcs = 9;
+  const int numFinishedStartupRpcs = 10;
 
   EXPECT_TRUE(metricEquals(
       metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs));
@@ -5870,11 +5871,11 @@ TEST_P(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
 
   // We expect that the following RPC calls are made during startup: `Probe`,
   // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`,
-  // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
+  // `ListVolumes` (2), `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
   //
   // TODO(chhsiao): As these are implementation details, we should count the
   // calls processed by a mock CSI plugin and check the metrics against that.
-  const int numFinishedStartupRpcs = 9;
+  const int numFinishedStartupRpcs = 10;
 
   EXPECT_TRUE(metricEquals(
       metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs));