You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2019/08/20 10:02:37 UTC

[mesos] 07/07: Performed periodic storage local provider reconciliations.

This is an automated email from the ASF dual-hosted git repository.

bbannier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git

commit deeae143ec0c2cc21137058a0f848a7081f85062
Author: Benjamin Bannier <bb...@apache.org>
AuthorDate: Wed Aug 14 09:33:43 2019 +0200

    Performed periodic storage local provider reconciliations.
    
    Review: https://reviews.apache.org/r/71151/
---
 src/resource_provider/storage/provider.cpp         | 120 +++++++++----
 .../storage_local_resource_provider_tests.cpp      | 195 ++++++++++++++++++++-
 2 files changed, 279 insertions(+), 36 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index 2f91fe0..f180af8 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -38,6 +38,7 @@
 
 #include <mesos/v1/resource_provider.hpp>
 
+#include <process/after.hpp>
 #include <process/collect.hpp>
 #include <process/defer.hpp>
 #include <process/delay.hpp>
@@ -79,6 +80,7 @@
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
+#include "resource_provider/constants.hpp"
 #include "resource_provider/detector.hpp"
 #include "resource_provider/state.hpp"
 
@@ -130,6 +132,7 @@ using mesos::internal::protobuf::convertLabelsToStringMap;
 using mesos::internal::protobuf::convertStringMapToLabels;
 
 using mesos::resource_provider::Call;
+using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::ResourceProviderState;
 
@@ -274,8 +277,10 @@ private:
   Future<Nothing> reconcileResourceProviderState();
   Future<Nothing> reconcileOperationStatuses();
 
-  // Query the plugin for its resources and update the providers state.
-  Future<Nothing> reconcileResources();
+  // Query the plugin for its resources and update the providers
+  // state. If `alwaysUpdate` is `true` an update will always be
+  // sent, even if no changes are detected.
+  Future<Nothing> reconcileResources(bool alwaysUpdate);
 
   ResourceConversion computeConversion(
       const Resources& checkpointed, const Resources& discovered) const;
@@ -311,6 +316,13 @@ private:
       const Event::AcknowledgeOperationStatus& acknowledge);
   void reconcileOperations(const Event::ReconcileOperations& reconcile);
 
+  // Periodically poll the provider for resource changes. The poll interval is
+  // controlled by
+  // `ResourceProviderInfo.Storage.reconciliation_interval_seconds`. When this
+  // function is invoked it will perform the first poll after one reconciliation
+  // interval.
+  void watchResources();
+
   // Applies the operation. Speculative operations will be synchronously
   // applied. Do nothing if the operation is already in a terminal state.
   Future<Nothing> _applyOperation(const id::UUID& operationUuid);
@@ -373,6 +385,8 @@ private:
   const Option<string> authToken;
   const bool strict;
 
+  const Duration reconciliationInterval;
+
   shared_ptr<DiskProfileAdaptor> diskProfileAdaptor;
 
   Owned<Driver> driver;
@@ -442,6 +456,10 @@ StorageLocalResourceProviderProcess::StorageLocalResourceProviderProcess(
     slaveId(_slaveId),
     authToken(_authToken),
     strict(_strict),
+    reconciliationInterval(
+        _info.storage().has_reconciliation_interval_seconds()
+          ? Seconds(info.storage().reconciliation_interval_seconds())
+          : DEFAULT_STORAGE_RECONCILIATION_INTERVAL),
     metrics("resource_providers/" + info.type() + "." + info.name() + "/"),
     resourceVersion(id::UUID::random()),
     sequence("storage-local-resource-provider-sequence")
@@ -716,24 +734,73 @@ Future<Nothing>
 StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return reconcileOperationStatuses()
-    .then(defer(self(), &Self::reconcileResources));
+    .then(defer(self(), &Self::reconcileResources, true))
+    .then(defer(self(), [this] {
+      statusUpdateManager.resume();
+
+      switch (state) {
+        case RECOVERING:
+        case DISCONNECTED:
+        case CONNECTED:
+        case SUBSCRIBED: {
+          LOG(INFO) << "Resource provider " << info.id()
+                    << " is in READY state";
+
+          state = READY;
+        }
+        case READY:
+          break;
+      }
+
+      return Nothing();
+    }));
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
+void StorageLocalResourceProviderProcess::watchResources()
+{
+  // A specified reconciliation interval of zero
+  // denotes disabled periodic reconciliations.
+  if (reconciliationInterval == Seconds(0)) {
+    return;
+  }
+
+  CHECK(info.has_id());
+
+  loop(
+      self(),
+      std::bind(&process::after, reconciliationInterval),
+      [this](const Nothing&) {
+        // Poll resource provider state in `sequence` to
+        // prevent concurrent non-reconcilable operations.
+        reconciled = sequence.add(std::function<Future<Nothing>()>(
+            defer(self(), &Self::reconcileResources, false)));
+
+        return reconciled.then(
+            [](const Nothing&) -> ControlFlow<Nothing> { return Continue(); });
+      });
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources(
+    bool alwaysUpdate)
 {
   LOG(INFO) << "Reconciling storage pools and volumes";
 
+  CHECK_PENDING(reconciled);
+
   return collect<vector<ResourceConversion>>(
              {getExistingVolumes(), getStoragePools()})
     .then(defer(
-        self(), [this](const vector<vector<ResourceConversion>>& collected) {
+        self(),
+        [alwaysUpdate, this]
+        (const vector<vector<ResourceConversion>>& collected) {
           Resources result = totalResources;
           foreach (const vector<ResourceConversion>& conversions, collected) {
             result = CHECK_NOTERROR(result.apply(conversions));
           }
 
-          bool shouldSendUpdate = false;
+          bool shouldSendUpdate = alwaysUpdate;
 
           if (result != totalResources) {
             LOG(INFO) << "Removing '" << (totalResources - result)
@@ -742,33 +809,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcileResources()
 
             // Update the resource version since the total resources changed.
             totalResources = result;
-            resourceVersion = id::UUID::random();
 
             checkpointResourceProviderState();
 
             shouldSendUpdate = true;
           }
 
-          switch (state) {
-            case RECOVERING:
-            case DISCONNECTED:
-            case CONNECTED:
-            case SUBSCRIBED: {
-              LOG(INFO) << "Resource provider " << info.id()
-                        << " is in READY state";
-
-              state = READY;
-
-              // This is the first resource update of the current subscription.
-              shouldSendUpdate = true;
-            }
-            case READY:
-              break;
-          }
-
           if (shouldSendUpdate) {
             sendResourceProviderStateUpdate();
-            statusUpdateManager.resume();
           }
 
           return Nothing();
@@ -1168,7 +1216,7 @@ void StorageLocalResourceProviderProcess::watchProfiles()
 
         std::function<Future<Nothing>()> update = defer(self(), [=] {
           return updateProfiles(profiles)
-            .then(defer(self(), &Self::reconcileResources));
+            .then(defer(self(), &Self::reconcileResources, false));
         });
 
         // Update the profile mapping and storage pools in `sequence` to wait
@@ -1261,10 +1309,12 @@ void StorageLocalResourceProviderProcess::subscribed(
   // Reconcile resources after obtaining the resource provider ID and start
   // watching for profile changes after the reconciliation.
   // TODO(chhsiao): Reconcile and watch for profile changes early.
-  reconciled = reconcileResourceProviderState()
-    .onReady(defer(self(), &Self::watchProfiles))
-    .onFailed(defer(self(), std::bind(die, lambda::_1)))
-    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+  reconciled =
+    reconcileResourceProviderState()
+      .onReady(defer(self(), &Self::watchProfiles))
+      .onReady(defer(self(), &Self::watchResources))
+      .onFailed(defer(self(), std::bind(die, lambda::_1)))
+      .onDiscarded(defer(self(), std::bind(die, "future discarded")));
 }
 
 
@@ -1728,7 +1778,7 @@ StorageLocalResourceProviderProcess::applyCreateDisk(
 
   // TODO(chhsiao): Consider calling `createVolume` sequentially with other
   // create or delete operations, and send an `UPDATE_STATE` for storage pools
-  // afterward. See MESOS-9254.
+  // afterward.
   Future<VolumeInfo> created;
   if (resource.disk().source().has_profile()) {
     created = volumeManager->createVolume(
@@ -1866,7 +1916,7 @@ StorageLocalResourceProviderProcess::applyDestroyDisk(
             reconciled =
               sequence
                 .add(std::function<Future<Nothing>()>(
-                    defer(self(), &Self::reconcileResources)))
+                    defer(self(), &Self::reconcileResources, false)))
                 .onFailed(std::bind(err, resource, lambda::_1))
                 .onDiscard(std::bind(err, resource, "future discarded"));
           }
@@ -2123,6 +2173,12 @@ void StorageLocalResourceProviderProcess::checkpointResourceProviderState()
 
 void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
 {
+  // Set a new resource version here since we typically send state
+  // updates when resources change. While this ensures we always have
+  // a new resource version whenever we set new state, with that this
+  // function is not idempotent anymore.
+  resourceVersion = id::UUID::random();
+
   Call call;
   call.set_type(Call::UPDATE_STATE);
   call.mutable_resource_provider_id()->CopyFrom(info.id());
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 7624ea1..05daf2a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -96,8 +96,6 @@ using mesos::internal::slave::ContainerDaemonProcess;
 using mesos::master::detector::MasterDetector;
 using mesos::master::detector::StandaloneMasterDetector;
 
-using mesos::resource_provider::DEFAULT_STORAGE_RECONCILIATION_INTERVAL;
-
 using process::Clock;
 using process::Future;
 using process::Owned;
@@ -1643,8 +1641,6 @@ TEST_P(StorageLocalResourceProviderTest, ProfileDisappeared)
   // The resource provider will reconcile the storage pools to reclaim the
   // space freed by destroying a MOUNT disk of a disappeared profile, which
   // would in turn trigger another agent update and thus another allocation.
-  //
-  // TODO(chhsiao): This might change once MESOS-9254 is done.
   AWAIT_READY(offers);
   ASSERT_EQ(1, offers->offers_size());
 
@@ -6708,6 +6704,197 @@ TEST_P(
   }
 }
 
+
+// This test validates that the SLRP periodically
+// reconciles resources with the CSI plugin.
+TEST_P(StorageLocalResourceProviderTest, Update)
+{
+  Clock::pause();
+
+  const string profilesPath = path::join(sandbox.get(), "profiles.json");
+
+  ASSERT_SOME(
+      os::write(profilesPath, createDiskProfileMapping({{"test", None()}})));
+
+  loadUriDiskProfileAdaptorModule(profilesPath);
+
+  const string mockCsiEndpoint =
+    "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+  MockCSIPlugin plugin;
+  ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+  constexpr Duration reconciliationInterval = Seconds(15);
+
+  setupResourceProviderConfig(
+      Bytes(0),
+      None(),
+      mockCsiEndpoint,
+      None(),
+      None(),
+      reconciliationInterval);
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  process::Queue<Nothing> getCapacityCalls;
+  process::Queue<Nothing> listVolumesCalls;
+  if (GetParam() == csi::v0::API_VERSION) {
+    EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::ListVolumesRequest* request,
+          csi::v0::ListVolumesResponse* response) {
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::ListVolumesRequest* request,
+          csi::v0::ListVolumesResponse* response) {
+        csi::v0::Volume* volume = response->add_entries()->mutable_volume();
+        volume->set_capacity_bytes(Bytes(1024).bytes());
+        volume->set_id("volume1");
+
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }));
+
+    EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v0::GetCapacityResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::GetCapacityRequest* request,
+          csi::v0::GetCapacityResponse* response) {
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::GetCapacityRequest* request,
+          csi::v0::GetCapacityResponse* response) {
+        response->set_available_capacity(Bytes(1024).bytes());
+
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }));
+  } else if (GetParam() == csi::v1::API_VERSION) {
+    EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::ListVolumesRequest* request,
+          csi::v1::ListVolumesResponse* response) {
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::ListVolumesRequest* request,
+          csi::v1::ListVolumesResponse* response) {
+        csi::v1::Volume* volume = response->add_entries()->mutable_volume();
+        volume->set_capacity_bytes(Bytes(1024).bytes());
+        volume->set_volume_id("volume1");
+
+        listVolumesCalls.put({});
+        return grpc::Status::OK;
+      }));
+
+    EXPECT_CALL(plugin, GetCapacity(_, _, A<csi::v1::GetCapacityResponse*>()))
+      .WillOnce(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::GetCapacityRequest* request,
+          csi::v1::GetCapacityResponse* response) {
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::GetCapacityRequest* request,
+          csi::v1::GetCapacityResponse* response) {
+        response->set_available_capacity(Bytes(1024).bytes());
+
+        getCapacityCalls.put({});
+        return grpc::Status::OK;
+      }));
+  }
+
+  Future<Nothing> listVolumes1 = listVolumesCalls.get();
+  Future<Nothing> listVolumes2 = listVolumesCalls.get();
+
+  Future<Nothing> getCapacity1 = getCapacityCalls.get();
+  Future<Nothing> getCapacity2 = getCapacityCalls.get();
+
+
+  // Since the local resource provider daemon gets subscribed after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider. After that a single update
+  // will be send since they underlying provider resources got changed.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s is reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave3 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration and prevent retry.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+  ASSERT_TRUE(updateSlave1->has_resource_providers());
+  ASSERT_TRUE(updateSlave1->resource_providers().providers().empty());
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(getCapacity1);
+  AWAIT_READY(listVolumes1);
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_FALSE(updateSlave2->resource_providers().providers().empty());
+
+  Clock::pause();
+
+  // Advance the clock so the SLRP polls for volume and storage pool updates.
+  Clock::settle();
+  Clock::advance(reconciliationInterval);
+
+  AWAIT_READY(listVolumes2);
+  AWAIT_READY(getCapacity2);
+  ASSERT_TRUE(updateSlave3.isPending());
+
+  // Advance the clock so the SLRP polls again.
+  Future<Nothing> listVolumes3 = listVolumesCalls.get();
+  Future<Nothing> getCapacity3 = getCapacityCalls.get();
+
+  Clock::settle();
+  Clock::advance(reconciliationInterval);
+
+  AWAIT_READY(listVolumes3);
+  AWAIT_READY(getCapacity3);
+  AWAIT_READY(updateSlave3);
+  ASSERT_TRUE(updateSlave3->has_resource_providers());
+  ASSERT_FALSE(updateSlave3->resource_providers().providers().empty());
+
+  // Resource changes are reported and the resource version changes.
+  ASSERT_NE(
+      updateSlave2->resource_providers().providers(0).resource_version_uuid(),
+      updateSlave3->resource_providers().providers(0).resource_version_uuid());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {