You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/08/20 10:02:30 UTC

[mesos] branch master updated (c797993 -> deeae14)

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git.


    from c797993  Added a test for RoleTree for basic add and remove operations.
     new d63be44  Fixed signature to pass parameter by const ref instead of value.
     new 0fdeae6  Clarified a comment in storage local resource provider tests.
     new 6e8ddb0  Updated config factory to set resource provider reconciliation interval.
     new 3f7ab74  Explicitly disabled periodic reconciliation for some provider tests.
     new 82cdd40  Renamed a storage provider function.
     new 266616a  Factored out storage provider method to update resources.
     new deeae14  Performed periodic storage local provider reconciliations.

The 7 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/resource_provider/storage/provider.cpp         | 204 ++++++++++-------
 .../storage_local_resource_provider_tests.cpp      | 249 ++++++++++++++++++---
 2 files changed, 349 insertions(+), 104 deletions(-)


[mesos] 01/07: Fixed signature to pass parameter by const ref instead of value.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit d63be44d94c5763a26c97990ac4236d60a4c2c18
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:14 2019 +0200

    Fixed signature to pass parameter by const ref instead of value.
    
    Review: https://reviews.apache.org/r/71145/
---
 src/tests/storage_local_resource_provider_tests.cpp | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 69b59d4..57b0c55 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -243,10 +243,10 @@ public:
 
   void setupResourceProviderConfig(
       const Bytes& capacity,
-      const Option<string> volumes = None(),
-      const Option<string> forward = None(),
-      const Option<string> createParameters = None(),
-      const Option<string> volumeMetadata = None())
+      const Option<string>& volumes = None(),
+      const Option<string>& forward = None(),
+      const Option<string>& createParameters = None(),
+      const Option<string>& volumeMetadata = None())
   {
     const string testCsiPluginPath =
       path::join(tests::flags.build_dir, "src", "test-csi-plugin");


[mesos] 05/07: Renamed a storage provider function.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 82cdd40968df7b1d4b2fb4bede5253b20ac2319f
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:32 2019 +0200

    Renamed a storage provider function.
    
    Review: https://reviews.apache.org/r/71149/
---
 src/resource_provider/storage/provider.cpp | 14 ++++++--------
 1 file changed, 6 insertions(+), 8 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 6d63260..71aa650 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -273,9 +273,8 @@ private:
   // truth, such as CSI plugin responses or the status update manager.
   Future<Nothing> reconcileResourceProviderState();
   Future<Nothing> reconcileOperationStatuses();
-  ResourceConversion reconcileResources(
-      const Resources& checkpointed,
-      const Resources& discovered);
+  ResourceConversion computeConversion(
+      const Resources& checkpointed, const Resources& discovered) const;
 
   // Returns a list of resource conversions to updates volume contexts for
   // existing volumes, remove disappeared unconverted volumes, and add newly
@@ -992,9 +991,8 @@ bool StorageLocalResourceProviderProcess::allowsReconciliation(
 }
 
 
-ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
-    const Resources& checkpointed,
-    const Resources& discovered)
+ResourceConversion StorageLocalResourceProviderProcess::computeConversion(
+    const Resources& checkpointed, const Resources& discovered) const
 {
   // NOTE: If a resource in the checkpointed resources is missing in the
   // discovered resources, we will still keep it if it is converted by
@@ -1120,7 +1118,7 @@ StorageLocalResourceProviderProcess::getExistingVolumes()
 
       return vector<ResourceConversion>{
         std::move(metadataConversion),
-        reconcileResources(std::move(checkpointed), std::move(discovered))};
+        computeConversion(std::move(checkpointed), std::move(discovered))};
     }));
 }
 
@@ -1158,7 +1156,7 @@ StorageLocalResourceProviderProcess::getStoragePools()
         });
 
       return vector<ResourceConversion>{
-        reconcileResources(std::move(checkpointed), std::move(discovered))};
+        computeConversion(std::move(checkpointed), std::move(discovered))};
     }));
 }
 


[mesos] 02/07: Clarified a comment in storage local resource provider tests.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 0fdeae60a1b675e71447a1189fb0f15b1b4f0129
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:19 2019 +0200

    Clarified a comment in storage local resource provider tests.
    
    Review: https://reviews.apache.org/r/71146/
---
 .../storage_local_resource_provider_tests.cpp      | 28 +++++++++++-----------
 1 file changed, 14 insertions(+), 14 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 57b0c55..8b9009c 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -612,7 +612,7 @@ TEST_P(StorageLocalResourceProviderTest, NoResource)
 
   slave::Flags slaveFlags = CreateSlaveFlags();
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -648,7 +648,7 @@ TEST_P(StorageLocalResourceProviderTest, NoResource)
   // Restart the agent.
   slave.get()->terminate();
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -693,7 +693,7 @@ TEST_P(StorageLocalResourceProviderTest, DISABLED_ZeroSizedDisk)
 
   slave::Flags slaveFlags = CreateSlaveFlags();
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -845,7 +845,7 @@ TEST_P(StorageLocalResourceProviderTest, ProfileAppeared)
   slave::Flags slaveFlags = CreateSlaveFlags();
   slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -1412,7 +1412,7 @@ TEST_P(StorageLocalResourceProviderTest, ProfileDisappeared)
   slave::Flags slaveFlags = CreateSlaveFlags();
   slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -4235,7 +4235,7 @@ TEST_P(StorageLocalResourceProviderTest, RetryOperationStatusUpdate)
   slave::Flags flags = CreateSlaveFlags();
   flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -4397,7 +4397,7 @@ TEST_P(
   slave::Flags flags = CreateSlaveFlags();
   flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -4490,7 +4490,7 @@ TEST_P(
   // Restart the agent.
   slave.get()->terminate();
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -4579,7 +4579,7 @@ TEST_P(StorageLocalResourceProviderTest, ContainerTerminationMetric)
 
   Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains information from
   // the storage local resource provider.
@@ -4684,7 +4684,7 @@ TEST_P(StorageLocalResourceProviderTest, OperationUpdate)
   slave::Flags flags = CreateSlaveFlags();
   flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -5222,7 +5222,7 @@ TEST_P(StorageLocalResourceProviderTest, ReconcileDroppedOperation)
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  // Since the local resource provider daemon is started after the agent is
+  // Since the local resource provider gets subscribed after the agent is
   // registered, it is guaranteed that the agent will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from the
   // storage local resource provider.
@@ -5604,7 +5604,7 @@ TEST_P(
   slave::Flags flags = CreateSlaveFlags();
   flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -5802,7 +5802,7 @@ TEST_P(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
   slave::Flags flags = CreateSlaveFlags();
   flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.
@@ -6397,7 +6397,7 @@ TEST_P(
   slave::Flags slaveFlags = CreateSlaveFlags();
   slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
-  // Since the local resource provider daemon is started after the agent
+  // Since the local resource provider gets subscribed after the agent
   // is registered, it is guaranteed that the slave will send two
   // `UpdateSlaveMessage`s, where the latter one contains resources from
   // the storage local resource provider.


[mesos] 06/07: Factored out storage provider method to update resources.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 266616aa6fe0027b07af80157bf7293735609d54
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:37 2019 +0200

    Factored out storage provider method to update resources.
    
    Review: https://reviews.apache.org/r/71150/
---
 src/resource_provider/storage/provider.cpp         | 120 ++++++++++-----------
 .../storage_local_resource_provider_tests.cpp      |  11 +-
 2 files changed, 64 insertions(+), 67 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 71aa650..2f91fe0 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -273,6 +273,10 @@ private:
   // truth, such as CSI plugin responses or the status update manager.
   Future<Nothing> reconcileResourceProviderState();
   Future<Nothing> reconcileOperationStatuses();
+
+  // Query the plugin for its resources and update the providers state.
+  Future<Nothing> reconcileResources();
+
   ResourceConversion computeConversion(
       const Resources& checkpointed, const Resources& discovered) const;
 
@@ -295,10 +299,6 @@ private:
   // set of profiles it knows about.
   Future<Nothing> updateProfiles(const hashset<string>& profiles);
 
-  // Reconcile the storage pools when the set of known profiles changes,
-  // or a volume with an unknown profile is destroyed.
-  Future<Nothing> reconcileStoragePools();
-
   // Returns true if the storage pools are allowed to be reconciled when
   // the operation is being applied.
   static bool allowsReconciliation(const Offer::Operation& operation);
@@ -716,39 +716,63 @@ Future<Nothing>
 StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return reconcileOperationStatuses()
-    .then(defer(self(), [=] {
-      return collect<vector<ResourceConversion>>(
-          {getExistingVolumes(), getStoragePools()})
-        .then(defer(self(), [=](
-            const vector<vector<ResourceConversion>>& collected) {
+    .then(defer(self(), &Self::reconcileResources));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
+{
+  LOG(INFO) << "Reconciling storage pools and volumes";
+
+  return collect<vector<ResourceConversion>>(
+             {getExistingVolumes(), getStoragePools()})
+    .then(defer(
+        self(), [this](const vector<vector<ResourceConversion>>& collected) {
           Resources result = totalResources;
           foreach (const vector<ResourceConversion>& conversions, collected) {
             result = CHECK_NOTERROR(result.apply(conversions));
           }
 
+          bool shouldSendUpdate = false;
+
           if (result != totalResources) {
-            LOG(INFO)
-              << "Removing '" << (totalResources - result) << "' and adding '"
-              << (result - totalResources) << "' to the total resources";
+            LOG(INFO) << "Removing '" << (totalResources - result)
+                      << "' and adding '" << (result - totalResources)
+                      << "' to the total resources";
 
+            // Update the resource version since the total resources changed.
             totalResources = result;
+            resourceVersion = id::UUID::random();
+
             checkpointResourceProviderState();
+
+            shouldSendUpdate = true;
           }
 
-          // NOTE: Since this is the first `UPDATE_STATE` call of the
-          // current subscription, there must be no racing speculative
-          // operation, thus no need to update the resource version.
-          sendResourceProviderStateUpdate();
-          statusUpdateManager.resume();
+          switch (state) {
+            case RECOVERING:
+            case DISCONNECTED:
+            case CONNECTED:
+            case SUBSCRIBED: {
+              LOG(INFO) << "Resource provider " << info.id()
+                        << " is in READY state";
 
-          LOG(INFO)
-            << "Resource provider " << info.id() << " is in READY state";
+              state = READY;
+
+              // This is the first resource update of the current subscription.
+              shouldSendUpdate = true;
+            }
+            case READY:
+              break;
+          }
 
-          state = READY;
+          if (shouldSendUpdate) {
+            sendResourceProviderStateUpdate();
+            statusUpdateManager.resume();
+          }
 
           return Nothing();
         }));
-    }));
 }
 
 
@@ -913,44 +937,6 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
-{
-  CHECK_PENDING(reconciled);
-
-  auto die = [=](const string& message) {
-    LOG(ERROR)
-      << "Failed to reconcile storage pools for resource provider " << info.id()
-      << ": " << message;
-    fatal();
-  };
-
-  return getStoragePools()
-    .then(defer(self(), [=](const vector<ResourceConversion>& conversions) {
-      Resources result = CHECK_NOTERROR(totalResources.apply(conversions));
-
-      if (result != totalResources) {
-        LOG(INFO)
-          << "Removing '" << (totalResources - result) << "' and adding '"
-          << (result - totalResources) << "' to the total resources";
-
-        totalResources = result;
-        checkpointResourceProviderState();
-
-        // NOTE: We always update the resource version before sending
-        // an `UPDATE_STATE`, so that any racing speculative operation
-        // will be rejected. Otherwise, the speculative resource
-        // conversion done on the master will be cancelled out.
-        resourceVersion = id::UUID::random();
-        sendResourceProviderStateUpdate();
-      }
-
-      return Nothing();
-    }))
-    .onFailed(defer(self(), std::bind(die, lambda::_1)))
-    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
-}
-
-
 bool StorageLocalResourceProviderProcess::allowsReconciliation(
     const Offer::Operation& operation)
 {
@@ -1182,7 +1168,7 @@ void StorageLocalResourceProviderProcess::watchProfiles()
 
         std::function<Future<Nothing>()> update = defer(self(), [=] {
           return updateProfiles(profiles)
-            .then(defer(self(), &Self::reconcileStoragePools));
+            .then(defer(self(), &Self::reconcileResources));
         });
 
         // Update the profile mapping and storage pools in `sequence` to wait
@@ -1871,8 +1857,18 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
             // pending operation that disallow reconciliation to finish, and set
             // up `reconciled` to drop incoming operations that disallow
             // reconciliation until the storage pools are reconciled.
-            reconciled = sequence.add(std::function<Future<Nothing>()>(
-                defer(self(), &Self::reconcileStoragePools)));
+            auto err = [](const Resource& resource, const string& message) {
+              LOG(ERROR)
+                << "Failed to reconcile storage pools after resource "
+                << "'" << resource << "' has been freed: " << message;
+            };
+
+            reconciled =
+              sequence
+                .add(std::function<Future<Nothing>()>(
+                    defer(self(), &Self::reconcileResources)))
+                .onFailed(std::bind(err, resource, lambda::_1))
+                .onDiscard(std::bind(err, resource, "future discarded"));
           }
         }
       } else {
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index f27f5c1..7624ea1 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -5129,12 +5129,13 @@ TEST_P(StorageLocalResourceProviderTest, CsiPluginRpcMetrics)
   ASSERT_SOME(source);
 
   // We expect that the following RPC calls are made during startup: `Probe`,
-  // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`,
-  // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
+  // `GetPluginInfo` (2), `GetCapacity`, `GetPluginCapabilities,
+  // `ControllerGetCapabilities`, `ListVolumes` (2), `NodeGetCapabilities`,
+  // `NodeGetId`.
   //
   // TODO(chhsiao): As these are implementation details, we should count the
   // calls processed by a mock CSI plugin and check the metrics against that.
-  const int numFinishedStartupRpcs = 9;
+  const int numFinishedStartupRpcs = 10;
 
   EXPECT_TRUE(metricEquals(
       metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs));
@@ -5870,11 +5871,11 @@ TEST_P(StorageLocalResourceProviderTest, RetryRpcWithExponentialBackoff)
 
   // We expect that the following RPC calls are made during startup: `Probe`,
   // `GetPluginInfo` (2), `GetPluginCapabilities, `ControllerGetCapabilities`,
-  // `ListVolumes`, `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
+  // `ListVolumes` (2), `GetCapacity`, `NodeGetCapabilities`, `NodeGetId`.
   //
   // TODO(chhsiao): As these are implementation details, we should count the
   // calls processed by a mock CSI plugin and check the metrics against that.
-  const int numFinishedStartupRpcs = 9;
+  const int numFinishedStartupRpcs = 10;
 
   EXPECT_TRUE(metricEquals(
       metricName("csi_plugin/rpcs_finished"), numFinishedStartupRpcs));


[mesos] 03/07: Updated config factory to set resource provider reconciliation interval.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 6e8ddb00025b0c353604d7d7db691fb99b53d2c6
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:23 2019 +0200

    Updated config factory to set resource provider reconciliation interval.
    
    Review: https://reviews.apache.org/r/71147/
---
 src/tests/storage_local_resource_provider_tests.cpp | 15 ++++++++++++---
 1 file changed, 12 insertions(+), 3 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 8b9009c..66c31eb 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -63,6 +63,8 @@
 
 #include "messages/messages.hpp"
 
+#include "resource_provider/constants.hpp"
+
 #include "slave/container_daemon_process.hpp"
 #include "slave/paths.hpp"
 #include "slave/state.hpp"
@@ -94,6 +96,8 @@ using mesos::internal::slave::ContainerDaemonProcess;
 using mesos::master::detector::MasterDetector;
 using mesos::master::detector::StandaloneMasterDetector;
 
+using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL;
+
 using process::Clock;
 using process::Future;
 using process::Owned;
@@ -246,7 +250,8 @@ public:
       const Option<string>& volumes = None(),
       const Option<string>& forward = None(),
       const Option<string>& createParameters = None(),
-      const Option<string>& volumeMetadata = None())
+      const Option<string>& volumeMetadata = None(),
+      const Option<Duration>& reconciliationInterval = None())
   {
     const string testCsiPluginPath =
       path::join(tests::flags.build_dir, "src", "test-csi-plugin");
@@ -304,7 +309,8 @@ public:
                   ]
                 }
               ]
-            }
+            },
+            "reconciliation_interval_seconds" : %s
           }
         }
         )~",
@@ -320,7 +326,10 @@ public:
         volumes.getOrElse(""),
         forward.isSome() ? "--forward=" + forward.get() : "",
         createParameters.getOrElse(""),
-        volumeMetadata.getOrElse(""));
+        volumeMetadata.getOrElse(""),
+        stringify(reconciliationInterval
+         .getOrElse(DEFAULT_STORAGE_RECONCILIATION_INTERVAL)
+         .secs()));
 
     ASSERT_SOME(resourceProviderConfig);
 


[mesos] 04/07: Explicitly disabled periodic reconciliation for some provider tests.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit 3f7ab747246857e5e404bc3898ba7406e767d0e5
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:27 2019 +0200

    Explicitly disabled periodic reconciliation for some provider tests.
    
    Review: https://reviews.apache.org/r/71148/
---
 src/tests/storage_local_resource_provider_tests.cpp | 4 +---
 1 file changed, 1 insertion(+), 3 deletions(-)

diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 66c31eb..f27f5c1 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -327,9 +327,7 @@ public:
         forward.isSome() ? "--forward=" + forward.get() : "",
         createParameters.getOrElse(""),
         volumeMetadata.getOrElse(""),
-        stringify(reconciliationInterval
-         .getOrElse(DEFAULT_STORAGE_RECONCILIATION_INTERVAL)
-         .secs()));
+        stringify(reconciliationInterval.getOrElse(Seconds(0)).secs()));
 
     ASSERT_SOME(resourceProviderConfig);
 


[mesos] 07/07: Performed periodic storage local provider reconciliations.

Posted by bb...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit deeae143ec0c2cc21137058a0f848a7081f85062
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:43 2019 +0200

    Performed periodic storage local provider reconciliations.
    
    Review: https://reviews.apache.org/r/71151/
---
 src/resource_provider/storage/provider.cpp         | 120 +++++++++----
 .../storage_local_resource_provider_tests.cpp      | 195 ++++++++++++++++++++-
 2 files changed, 279 insertions(+), 36 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 2f91fe0..f180af8 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -38,6 +38,7 @@
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <process/after.hpp>
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -79,6 +80,7 @@
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
+#include "resource_provider/constants.hpp"
 #include "resource_provider/detector.hpp"
 #include "resource_provider/state.hpp"
 
@@ -130,6 +132,7 @@ using mesos::internal::protobuf::convertLabelsToStringMap;
 using mesos::internal::protobuf::convertStringMapToLabels;
 
 using mesos::resource_provider::Call;
+using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::ResourceProviderState;
 
@@ -274,8 +277,10 @@ private:
   Future<Nothing> reconcileResourceProviderState();
   Future<Nothing> reconcileOperationStatuses();
 
-  // Query the plugin for its resources and update the providers state.
-  Future<Nothing> reconcileResources();
+  // Query the plugin for its resources and update the providers
+  // state. If `alwaysUpdate` is `true` an update will always be
+  // sent, even if no changes are detected.
+  Future<Nothing> reconcileResources(bool alwaysUpdate);
 
   ResourceConversion computeConversion(
       const Resources& checkpointed, const Resources& discovered) const;
@@ -311,6 +316,13 @@ private:
       const Event::AcknowledgeOperationStatus& acknowledge);
   void reconcileOperations(const Event::ReconcileOperations& reconcile);
 
+  // Periodically poll the provider for resource changes. The poll interval is
+  // controlled by
+  // `ResourceProviderInfo.Storage.reconciliation_interval_seconds`. When this
+  // function is invoked it will perform the first poll after one reconciliation
+  // interval.
+  void watchResources();
+
   // Applies the operation. Speculative operations will be synchronously
   // applied. Do nothing if the operation is already in a terminal state.
   Future<Nothing> _applyOperation(const id::UUID& operationUuid);
@@ -373,6 +385,8 @@ private:
   const Option<string> authToken;
   const bool strict;
 
+  const Duration reconciliationInterval;
+
   shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
   Owned<Driver> driver;
@@ -442,6 +456,10 @@ StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
     slaveId(_slaveId),
     authToken(_authToken),
     strict(_strict),
+    reconciliationInterval(
+        _info.storage().has_reconciliation_interval_seconds()
+          ? Seconds(info.storage().reconciliation_interval_seconds())
+          : DEFAULT_STORAGE_RECONCILIATION_INTERVAL),
     metrics("resource_providers/" + info.type() + "." + info.name() + "/"),
     resourceVersion(id::UUID::random()),
     sequence("storage-local-resource-provider-sequence")
@@ -716,24 +734,73 @@ Future<Nothing>
 StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return reconcileOperationStatuses()
-    .then(defer(self(), &Self::reconcileResources));
+    .then(defer(self(), &Self::reconcileResources, true))
+    .then(defer(self(), [this] {
+      statusUpdateManager.resume();
+
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case CONNECTED:
+        case SUBSCRIBED: {
+          LOG(INFO) << "Resource provider " << info.id()
+                    << " is in READY state";
+
+          state = READY;
+        }
+        case READY:
+          break;
+      }
+
+      return Nothing();
+    }));
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
+void StorageLocalResourceProviderProcess::watchResources()
+{
+  // A specified reconciliation interval of zero
+  // denotes disabled periodic reconciliations.
+  if (reconciliationInterval == Seconds(0)) {
+    return;
+  }
+
+  CHECK(info.has_id());
+
+  loop(
+      self(),
+      std::bind(&process::after, reconciliationInterval),
+      [this](const Nothing&) {
+        // Poll resource provider state in `sequence` to
+        // prevent concurrent non-reconcilable operations.
+        reconciled = sequence.add(std::function<Future<Nothing>()>(
+            defer(self(), &Self::reconcileResources, false)));
+
+        return reconciled.then(
+            [](const Nothing&) -> ControlFlow<Nothing> { return Continue(); });
+      });
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources(
+    bool alwaysUpdate)
 {
   LOG(INFO) << "Reconciling storage pools and volumes";
 
+  CHECK_PENDING(reconciled);
+
   return collect<vector<ResourceConversion>>(
              {getExistingVolumes(), getStoragePools()})
     .then(defer(
-        self(), [this](const vector<vector<ResourceConversion>>& collected) {
+        self(),
+        [alwaysUpdate, this]
+        (const vector<vector<ResourceConversion>>& collected) {
           Resources result = totalResources;
           foreach (const vector<ResourceConversion>& conversions, collected) {
             result = CHECK_NOTERROR(result.apply(conversions));
           }
 
-          bool shouldSendUpdate = false;
+          bool shouldSendUpdate = alwaysUpdate;
 
           if (result != totalResources) {
             LOG(INFO) << "Removing '" << (totalResources - result)
@@ -742,33 +809,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
 
             // Update the resource version since the total resources changed.
             totalResources = result;
-            resourceVersion = id::UUID::random();
 
             checkpointResourceProviderState();
 
             shouldSendUpdate = true;
           }
 
-          switch (state) {
-            case RECOVERING:
-            case DISCONNECTED:
-            case CONNECTED:
-            case SUBSCRIBED: {
-              LOG(INFO) << "Resource provider " << info.id()
-                        << " is in READY state";
-
-              state = READY;
-
-              // This is the first resource update of the current subscription.
-              shouldSendUpdate = true;
-            }
-            case READY:
-              break;
-          }
-
           if (shouldSendUpdate) {
             sendResourceProviderStateUpdate();
-            statusUpdateManager.resume();
           }
 
           return Nothing();
@@ -1168,7 +1216,7 @@ void StorageLocalResourceProviderProcess::watchProfiles()
 
         std::function<Future<Nothing>()> update = defer(self(), [=] {
           return updateProfiles(profiles)
-            .then(defer(self(), &Self::reconcileResources));
+            .then(defer(self(), &Self::reconcileResources, false));
         });
 
         // Update the profile mapping and storage pools in `sequence` to wait
@@ -1261,10 +1309,12 @@ void StorageLocalResourceProviderProcess::subscribed(
   // Reconcile resources after obtaining the resource provider ID and start
   // watching for profile changes after the reconciliation.
   // TODO(chhsiao): Reconcile and watch for profile changes early.
-  reconciled = reconcileResourceProviderState()
-    .onReady(defer(self(), &Self::watchProfiles))
-    .onFailed(defer(self(), std::bind(die, lambda::_1)))
-    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+  reconciled =
+    reconcileResourceProviderState()
+      .onReady(defer(self(), &Self::watchProfiles))
+      .onReady(defer(self(), &Self::watchResources))
+      .onFailed(defer(self(), std::bind(die, lambda::_1)))
+      .onDiscarded(defer(self(), std::bind(die, "future discarded")));
 }
 
 
@@ -1728,7 +1778,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
 
   // TODO(chhsiao): Consider calling `createVolume` sequentially with other
   // create or delete operations, and send an `UPDATE_STATE` for storage pools
-  // afterward. See MESOS-9254.
+  // afterward.
   Future<VolumeInfo> created;
   if (resource.disk().source().has_profile()) {
     created = volumeManager->createVolume(
@@ -1866,7 +1916,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
             reconciled =
               sequence
                 .add(std::function<Future<Nothing>()>(
-                    defer(self(), &Self::reconcileResources)))
+                    defer(self(), &Self::reconcileResources, false)))
                 .onFailed(std::bind(err, resource, lambda::_1))
                 .onDiscard(std::bind(err, resource, "future discarded"));
           }
@@ -2123,6 +2173,12 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 
 void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
 {
+  // Set a new resource version here since we typically send state
+  // updates when resources change. While this ensures we always have
+  // a new resource version whenever we set new state, with that this
+  // function is not idempotent anymore.
+  resourceVersion = id::UUID::random();
+
   Call call;
   call.set_type(Call::UPDATE_STATE);
   call.mutable_resource_provider_id()->CopyFrom(info.id());
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 7624ea1..05daf2a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -96,8 +96,6 @@ using mesos::internal::slave::ContainerDaemonProcess;
 using mesos::master::detector::MasterDetector;
 using mesos::master::detector::StandaloneMasterDetector;
 
-using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL;
-
 using process::Clock;
 using process::Future;
 using process::Owned;
@@ -1643,8 +1641,6 @@ TEST_P(StorageLocalResourceProviderTest, ProfileDisappeared)
   // The resource provider will reconcile the storage pools to reclaim the
   // space freed by destroying a MOUNT disk of a disappeared profile, which
   // would in turn trigger another agent update and thus another allocation.
-  //
-  // TODO(chhsiao): This might change once MESOS-9254 is done.
   AWAIT_READY(offers);
   ASSERT_EQ(1, offers->offers_size());
 
@@ -6708,6 +6704,197 @@ TEST_P(
   }
 }
 
+
+// This test validates that the SLRP periodically
+// reconciles resources with the CSI plugin.
+TEST_P(StorageLocalResourceProviderTest, Update)
+{
+  Clock::pause();
+
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  const string mockCsiEndpoint =
+    "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+  MockCSIPlugin plugin;
+  ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+  constexpr Duration reconciliationInterval = Seconds(15);
+
+  setupResourceProviderConfig(
+      Bytes(0),
+      None(),
+      mockCsiEndpoint,
+      None(),
+      None(),
+      reconciliationInterval);
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  process::Queue<Nothing> getCapacityCalls;
+  process::Queue<Nothing> listVolumesCalls;
+  if (GetParam() == csi::v0::API_VERSION) {
+    EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::ListVolumesRequest* request,
+          csi::v0::ListVolumesResponse* response) {
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::ListVolumesRequest* request,
+          csi::v0::ListVolumesResponse* response) {
+        csi::v0::Volume* volume = response->add_entries()->mutable_volume();
+        volume->set_capacity_bytes(Bytes(1024).bytes());
+        volume->set_id("volume1");
+
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }));
+
+    EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::GetCapacityRequest* request,
+          csi::v0::GetCapacityResponse* response) {
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::GetCapacityRequest* request,
+          csi::v0::GetCapacityResponse* response) {
+        response->set_available_capacity(Bytes(1024).bytes());
+
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }));
+  } else if (GetParam() == csi::v1::API_VERSION) {
+    EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::ListVolumesRequest* request,
+          csi::v1::ListVolumesResponse* response) {
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::ListVolumesRequest* request,
+          csi::v1::ListVolumesResponse* response) {
+        csi::v1::Volume* volume = response->add_entries()->mutable_volume();
+        volume->set_capacity_bytes(Bytes(1024).bytes());
+        volume->set_volume_id("volume1");
+
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }));
+
+    EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v1::GetCapacityResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::GetCapacityRequest* request,
+          csi::v1::GetCapacityResponse* response) {
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::GetCapacityRequest* request,
+          csi::v1::GetCapacityResponse* response) {
+        response->set_available_capacity(Bytes(1024).bytes());
+
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }));
+  }
+
+  Future<Nothing> listVolumes1 = listVolumesCalls.get();
+  Future<Nothing> listVolumes2 = listVolumesCalls.get();
+
+  Future<Nothing> getCapacity1 = getCapacityCalls.get();
+  Future<Nothing> getCapacity2 = getCapacityCalls.get();
+
+
+  // Since the local resource provider daemon gets subscribed after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider. After that a single update
+  // will be send since they underlying provider resources got changed.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave3 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration and prevent retry.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+  ASSERT_TRUE(updateSlave1->has_resource_providers());
+  ASSERT_TRUE(updateSlave1->resource_providers().providers().empty());
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(getCapacity1);
+  AWAIT_READY(listVolumes1);
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_FALSE(updateSlave2->resource_providers().providers().empty());
+
+  Clock::pause();
+
+  // Advance the clock so the SLRP polls for volume and storage pool updates.
+  Clock::settle();
+  Clock::advance(reconciliationInterval);
+
+  AWAIT_READY(listVolumes2);
+  AWAIT_READY(getCapacity2);
+  ASSERT_TRUE(updateSlave3.isPending());
+
+  // Advance the clock so the SLRP polls again.
+  Future<Nothing> listVolumes3 = listVolumesCalls.get();
+  Future<Nothing> getCapacity3 = getCapacityCalls.get();
+
+  Clock::settle();
+  Clock::advance(reconciliationInterval);
+
+  AWAIT_READY(listVolumes3);
+  AWAIT_READY(getCapacity3);
+  AWAIT_READY(updateSlave3);
+  ASSERT_TRUE(updateSlave3->has_resource_providers());
+  ASSERT_FALSE(updateSlave3->resource_providers().providers().empty());
+
+  // Resource changes are reported and the resource version changes.
+  ASSERT_NE(
+      updateSlave2->resource_providers().providers(0).resource_version_uuid(),
+      updateSlave3->resource_providers().providers(0).resource_version_uuid());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {