You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/07/13 21:41:09 UTC

[04/17] mesos git commit: Reconciled storage pools when destroying volumes with stale profiles.

Reconciled storage pools when destroying volumes with stale profiles.

The storage pools needs to be reconciled in the following two scenarios:

1. When there is a change in the set of known profiles.
2. When a volume/block of a disappeared profile is destroyed, because
   the disk space being freed up may belong to another appeared profile.

This patch sequentializes reconciliations triggered by the above two
cases.

Review: https://reviews.apache.org/r/65975


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/16d19ac3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/16d19ac3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/16d19ac3

Branch: refs/heads/master
Commit: 16d19ac37dc3b7954bba5b7860d0899a5dc468e6
Parents: 1418686
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Mar 7 20:35:57 2018 -0800
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Fri Jul 13 14:30:06 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 388 ++++++++++++++----------
 1 file changed, 232 insertions(+), 156 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/16d19ac3/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index b90a4b8..07645fc 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -98,6 +98,7 @@ using process::Sequence;
 using process::Timeout;
 
 using process::after;
+using process::await;
 using process::collect;
 using process::defer;
 using process::loop;
@@ -308,9 +309,8 @@ public:
       slaveId(_slaveId),
       authToken(_authToken),
       strict(_strict),
-      reconciling(false),
       resourceVersion(id::UUID::random()),
-      operationSequence("operation-sequence"),
+      sequence("storage-local-resource-provider-sequence"),
       metrics("resource_providers/" + info.type() + "." + info.name() + "/")
   {
     diskProfileAdaptor = DiskProfileAdaptor::getAdaptor();
@@ -361,12 +361,23 @@ private:
       const Resources& checkpointed,
       const Resources& discovered);
 
-  // Helper for updating the profiles mapping upon receiving an updated
-  // set of profiles from the DiskProfileAdaptor module.
-  Future<Nothing> updateProfiles();
+  // Spawns a loop to watch for changes in the set of known profiles and update
+  // the profile mapping and storage pools accordingly.
+  void watchProfiles();
 
-  // Reconcile the storage pools upon profile updates.
-  Future<Nothing> reconcileProfileUpdates();
+  // Update the profile mapping when the set of known profiles changes.
+  // NOTE: This function never fails. If it fails to translate a new
+  // profile, the resource provider will continue to operate with the
+  // set of profiles it knows about.
+  Future<Nothing> updateProfiles(const hashset<string>& profiles);
+
+  // Reconcile the storage pools when the set of known profiles changes,
+  // or a volume with an unknown profile is destroyed.
+  Future<Nothing> reconcileStoragePools();
+
+  // Returns true if the storage pools are allowed to be reconciled when
+  // the operation is being applied.
+  static bool allowsReconciliation(const Offer::Operation& operation);
 
   // Functions for received events.
   void subscribed(const Event::Subscribed& subscribed);
@@ -465,12 +476,6 @@ private:
   // The mapping of known profiles fetched from the DiskProfileAdaptor.
   hashmap<string, DiskProfileAdaptor::ProfileInfo> profileInfos;
 
-  // The last set of profile names fetched from the DiskProfileAdaptor.
-  hashset<string> knownProfiles;
-
-  // True if a reconciliation of storage pools is happening.
-  bool reconciling;
-
   hashmap<ContainerID, Owned<ContainerDaemon>> daemons;
   hashmap<ContainerID, Owned<Promise<csi::v0::Client>>> services;
 
@@ -493,10 +498,14 @@ private:
   id::UUID resourceVersion;
   hashmap<string, VolumeData> volumes;
 
-  // We maintain a sequence to keep track of ongoing volume/block
-  // creation or destroy. These operations will not be sequentialized
-  // through the sequence. It is simply used to wait for them to finish.
-  Sequence operationSequence;
+  // If pending, it means that the storage pools are being reconciled, and all
+  // incoming operations that disallow reconciliation will be dropped.
+  Future<Nothing> reconciled;
+
+  // We maintain a sequence to coordinate reconciliations of storage pools. It
+  // keeps track of pending operations that disallow reconciliation, and ensures
+  // that any reconciliation waits for these operations to finish.
+  Sequence sequence;
 
   struct Metrics
   {
@@ -664,30 +673,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
 
       statusUpdateManager.pause();
 
-      auto err = [](const string& message) {
-        LOG(ERROR)
-          << "Failed to watch for DiskProfileAdaptor: " << message;
-      };
-
-      // Start watching the DiskProfileAdaptor.
-      // TODO(chhsiao): Consider retrying with backoff.
-      loop(
-          self(),
-          [=] {
-            return diskProfileAdaptor->watch(knownProfiles, info)
-              .then(defer(self(), [=](const hashset<string>& profiles) {
-                // Save the returned set of profiles so that we
-                // can watch the module for changes to it.
-                knownProfiles = profiles;
-
-                return updateProfiles()
-                  .then(defer(self(), &Self::reconcileProfileUpdates));
-              }));
-          },
-          [](Nothing) -> ControlFlow<Nothing> { return Continue(); })
-        .onFailed(std::bind(err, lambda::_1))
-        .onDiscarded(std::bind(err, "future discarded"));
-
       driver.reset(new Driver(
           Owned<EndpointDetector>(new ConstantEndpointDetector(url)),
           contentType,
@@ -1271,6 +1256,92 @@ StorageLocalResourceProviderProcess::reconcileOperationStatuses()
 }
 
 
+Future<Nothing> StorageLocalResourceProviderProcess::reconcileStoragePools()
+{
+  CHECK_PENDING(reconciled);
+
+  auto die = [=](const string& message) {
+    LOG(ERROR)
+      << "Failed to reconcile storage pools for resource provider " << info.id()
+      << ": " << message;
+    fatal();
+  };
+
+  return getCapacities()
+    .then(defer(self(), [=](const Resources& discovered) {
+      ResourceConversion conversion = reconcileResources(
+          totalResources.filter(
+              [](const Resource& r) { return !r.disk().source().has_id(); }),
+          discovered);
+
+      Try<Resources> result = totalResources.apply(conversion);
+      CHECK_SOME(result);
+
+      if (result.get() != totalResources) {
+        LOG(INFO)
+          << "Removing '" << conversion.consumed << "' and adding '"
+          << conversion.converted << "' to the total resources";
+
+        totalResources = result.get();
+        checkpointResourceProviderState();
+
+        // NOTE: We always update the resource version before sending
+        // an `UPDATE_STATE`, so that any racing speculative operation
+        // will be rejected. Otherwise, the speculative resource
+        // conversion done on the master will be cancelled out.
+        resourceVersion = id::UUID::random();
+        sendResourceProviderStateUpdate();
+      }
+
+      return Nothing();
+    }))
+    .onFailed(defer(self(), std::bind(die, lambda::_1)))
+    .onDiscarded(defer(self(), std::bind(die, "future discarded")));
+}
+
+
+bool StorageLocalResourceProviderProcess::allowsReconciliation(
+    const Offer::Operation& operation)
+{
+  switch (operation.type()) {
+    case Offer::Operation::RESERVE:
+    case Offer::Operation::UNRESERVE: {
+      Resources consumedStoragePools =
+        CHECK_NOTERROR(protobuf::getConsumedResources(operation))
+          .filter([](const Resource& r) {
+            return r.disk().source().has_profile() &&
+              r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+          });
+
+      return consumedStoragePools.empty();
+    }
+    case Offer::Operation::CREATE:
+    case Offer::Operation::DESTROY: {
+      return true;
+    }
+    case Offer::Operation::CREATE_VOLUME:
+    case Offer::Operation::DESTROY_VOLUME:
+    case Offer::Operation::CREATE_BLOCK:
+    case Offer::Operation::DESTROY_BLOCK: {
+      return false;
+    }
+    case Offer::Operation::GROW_VOLUME:
+    case Offer::Operation::SHRINK_VOLUME: {
+      // TODO(chhsiao): These operations are currently not supported for
+      // resource providers, and should have been validated by the master.
+      UNREACHABLE();
+    }
+    case Offer::Operation::UNKNOWN:
+    case Offer::Operation::LAUNCH:
+    case Offer::Operation::LAUNCH_GROUP: {
+      UNREACHABLE();
+    }
+  }
+
+  UNREACHABLE();
+}
+
+
 ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
     const Resources& checkpointed,
     const Resources& discovered)
@@ -1319,93 +1390,80 @@ ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
 }
 
 
-Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles()
+void StorageLocalResourceProviderProcess::watchProfiles()
 {
-  LOG(INFO)
-    << "Updating metadata for profiles: " << stringify(knownProfiles);
+  auto err = [](const string& message) {
+    LOG(ERROR) << "Failed to watch for DiskProfileAdaptor: " << message;
+  };
+
+  // TODO(chhsiao): Consider retrying with backoff.
+  loop(
+      self(),
+      [=] {
+        return diskProfileAdaptor->watch(profileInfos.keys(), info);
+      },
+      [=](const hashset<string>& profiles) {
+        CHECK(info.has_id());
+
+        LOG(INFO)
+          << "Updating profiles " << stringify(profiles)
+          << " for resource provider " << info.id();
+
+        std::function<Future<Nothing>()> update = defer(self(), [=] {
+          return updateProfiles(profiles)
+            .then(defer(self(), &Self::reconcileStoragePools));
+        });
 
+        // Update the profile mapping and storage pools in `sequence` to wait
+        // for any pending operation that disallow reconciliation or the last
+        // reconciliation (if any) to finish, and set up `reconciled` to drop
+        // incoming operations that disallow reconciliation until the storage
+        // pools are reconciled.
+        reconciled = sequence.add(update);
+
+        return reconciled
+          .then(defer(self(), [=]() -> ControlFlow<Nothing> {
+            return Continue();
+          }));
+      })
+    .onFailed(std::bind(err, lambda::_1))
+    .onDiscarded(std::bind(err, "future discarded"));
+}
+
+
+Future<Nothing> StorageLocalResourceProviderProcess::updateProfiles(
+    const hashset<string>& profiles)
+{
   vector<Future<Nothing>> futures;
-  foreach (const string& profile, knownProfiles) {
-    // Since profiles are immutable after creation and cannot be
-    // deleted, we do not need to update any profile that is already in
-    // the mapping.
+  foreach (const string& profile, profiles) {
+    // Since profiles are immutable after creation, we do not need to
+    // translate any profile that is already in the mapping.
     // TODO(chhsiao): Handle profile deactivation.
     if (profileInfos.contains(profile)) {
       continue;
     }
 
+    auto err = [](const string& profile, const string& message) {
+      LOG(ERROR)
+        << "Failed to translate profile '" << profile << "': " << message;
+    };
+
     futures.push_back(diskProfileAdaptor->translate(profile, info)
-      .then(defer(self(), [=](const DiskProfileAdaptor::ProfileInfo& info) {
-        profileInfos.put(profile, info);
+      .then(defer(self(), [=](
+          const DiskProfileAdaptor::ProfileInfo& profileInfo) {
+        profileInfos.put(profile, profileInfo);
         return Nothing();
-      })));
+      }))
+      .onFailed(std::bind(err, profile, lambda::_1))
+      .onDiscarded(std::bind(err, profile, "future discarded")));
   }
 
-  return collect(futures).then([] { return Nothing(); });
-}
-
-
-Future<Nothing> StorageLocalResourceProviderProcess::reconcileProfileUpdates()
-{
-  // Do nothing if the resource provider ID is not known yet, since it
-  // is used to construct the resource metadata of storage pools. The
-  // metadata will be constructed in `reconcileResourceProviderState`.
-  if (!info.has_id()) {
-    return Nothing();
-  }
-
-  CHECK(!reconciling);
-
-  LOG(INFO) << "Reconciling storage pools for resource provider " << info.id();
-
-  reconciling = true;
-
-  // We add a lambda into `OperationSequence` so that it will return
-  // after waiting for all pending operations in the sequence.
-  return operationSequence.add(
-      std::function<Future<Nothing>()>([] { return Nothing(); }))
-    .then(defer(self(), &Self::getCapacities))
-    .then(defer(self(), [=](const Resources& discovered) {
-      auto isStoragePool = [](const Resource& r) {
-        return !r.disk().source().has_id();
-      };
-
-      ResourceConversion conversion = reconcileResources(
-          totalResources.filter(isStoragePool),
-          discovered);
-
-      Try<Resources> result = totalResources.apply(conversion);
-      CHECK_SOME(result);
-
-      if (result.get() != totalResources) {
-        LOG(INFO)
-          << "Removing '" << conversion.consumed << "' and adding '"
-          << conversion.converted << "' to the total resources";
-
-        totalResources = result.get();
-        checkpointResourceProviderState();
-
-        // NOTE: We ensure that the first `UPDATE_STATE` of the current
-        // subscription is sent by `reconcileResourceProviderState`, so
-        // that the total resources contain existing volumes.
-        if (state == READY) {
-          // NOTE: We always update the resource version before sending
-          // an `UPDATE_STATE`, so that any racing speculative operation
-          // will be rejected. Otherwise, the speculative resource
-          // conversion done on the master will be cancelled out.
-          resourceVersion = id::UUID::random();
-          sendResourceProviderStateUpdate();
-        }
-      }
-
-      LOG(INFO)
-        << "Finished reconciliation of storage pools for resource provider "
-        << info.id();
-
-      reconciling = false;
-
-      return Nothing();
-    }));
+  // We use `await` here to return a future that never fails, so the loop in
+  // `watchProfiles` will continue to watch for profile changes. If any profile
+  // translation fails, the profile will not be added to the set of known
+  // profiles and thus the disk profile adaptor will notify the resource
+  // provider again.
+  return await(futures).then([] { return Nothing(); });
 }
 
 
@@ -1436,9 +1494,11 @@ void StorageLocalResourceProviderProcess::subscribed(
     fatal();
   };
 
-  // Reconcile resources after obtaining the resource provider ID.
-  // TODO(chhsiao): Do the reconciliation early.
-  reconcileResourceProviderState()
+  // Reconcile resources after obtaining the resource provider ID and start
+  // watching for profile changes after the reconciliation.
+  // TODO(chhsiao): Reconcile and watch for profile changes early.
+  reconciled = reconcileResourceProviderState()
+    .onReady(defer(self(), &Self::watchProfiles))
     .onFailed(defer(self(), std::bind(die, lambda::_1)))
     .onDiscarded(defer(self(), std::bind(die, "future discarded")));
 }
@@ -1467,7 +1527,7 @@ void StorageLocalResourceProviderProcess::applyOperation(
         "Cannot apply operation in SUBSCRIBED state");
   }
 
-  if (reconciling) {
+  if (reconciled.isPending() && !allowsReconciliation(operation.info())) {
     return dropOperation(
         uuid.get(),
         frameworkId,
@@ -2813,14 +2873,9 @@ Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
     .then(defer(self(), [=](csi::v0::Client client) {
       vector<Future<Resources>> futures;
 
-      foreach (const string& profile, knownProfiles) {
-        CHECK(profileInfos.contains(profile));
-
-        // TODO(chhsiao): Skip inactive profiles.
-
-        const DiskProfileAdaptor::ProfileInfo& profileInfo =
-          profileInfos.at(profile);
-
+      foreachpair (const string& profile,
+                   const DiskProfileAdaptor::ProfileInfo& profileInfo,
+                   profileInfos) {
         csi::v0::GetCapacityRequest request;
         request.add_volume_capabilities()->CopyFrom(profileInfo.capability);
         *request.mutable_parameters() = profileInfo.parameters;
@@ -2859,7 +2914,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
   CHECK(!protobuf::isTerminalState(operation.latest_status().state()));
 
   Future<vector<ResourceConversion>> conversions;
-  Option<Resource> source;
 
   switch (operation.info().type()) {
     case Offer::Operation::RESERVE:
@@ -2876,9 +2930,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
     case Offer::Operation::CREATE_VOLUME: {
       CHECK(operation.info().has_create_volume());
 
-      source = operation.info().create_volume().source();
       conversions = applyCreateVolumeOrBlock(
-          source.get(),
+          operation.info().create_volume().source(),
           operationUuid,
           operation.info().create_volume().target_type());
 
@@ -2887,17 +2940,16 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
     case Offer::Operation::DESTROY_VOLUME: {
       CHECK(operation.info().has_destroy_volume());
 
-      source = operation.info().destroy_volume().volume();
-      conversions = applyDestroyVolumeOrBlock(source.get());
+      conversions = applyDestroyVolumeOrBlock(
+          operation.info().destroy_volume().volume());
 
       break;
     }
     case Offer::Operation::CREATE_BLOCK: {
       CHECK(operation.info().has_create_block());
 
-      source = operation.info().create_block().source();
       conversions = applyCreateVolumeOrBlock(
-          source.get(),
+          operation.info().create_block().source(),
           operationUuid,
           Resource::DiskInfo::Source::BLOCK);
 
@@ -2906,8 +2958,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
     case Offer::Operation::DESTROY_BLOCK: {
       CHECK(operation.info().has_destroy_block());
 
-      source = operation.info().destroy_block().block();
-      conversions = applyDestroyVolumeOrBlock(source.get());
+      conversions = applyDestroyVolumeOrBlock(
+          operation.info().destroy_block().block());
 
       break;
     }
@@ -2924,7 +2976,9 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
     }
   }
 
-  // NOTE: The code below is executed only when applying a storage operation.
+  CHECK(!protobuf::isSpeculativeOperation(operation.info()))
+    << "Unexpected speculative operation: " << operation.info().type();
+
   shared_ptr<Promise<Nothing>> promise(new Promise<Nothing>());
 
   conversions
@@ -2946,12 +3000,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::_applyOperation(
 
   Future<Nothing> future = promise->future();
 
-  CHECK_SOME(source);
-  if (source->disk().source().has_profile()) {
-    // We place the future in `operationSequence` so it can be waited
-    // for during reconciliation upon profile updates.
-    operationSequence.add(
-        std::function<Future<Nothing>()>([future] { return future; }));
+  if (!allowsReconciliation(operation.info())) {
+    // We place the future in `sequence` so it can be waited before reconciling
+    // storage pools.
+    sequence.add(std::function<Future<Nothing>()>([future] { return future; }));
   }
 
   return future;
@@ -3199,7 +3251,6 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
   CHECK(volumes.contains(resource.disk().source().id()));
 
   // Sequentialize the deletion with other operation on the same volume.
-  // NOTE: A resource has no profile iff it is a pre-existing volume.
   return volumes.at(resource.disk().source().id()).sequence->add(
       std::function<Future<Nothing>()>(defer(
           self(),
@@ -3207,19 +3258,44 @@ StorageLocalResourceProviderProcess::applyDestroyVolumeOrBlock(
           resource.disk().source().id(),
           !resource.disk().source().has_profile())))
     .then(defer(self(), [=]() {
-      // TODO(chhsiao): Convert to an empty resource and update all
-      // storage pools if the profile has been deactivated.
       Resource converted = resource;
       converted.mutable_disk()->mutable_source()->set_type(
           Resource::DiskInfo::Source::RAW);
       converted.mutable_disk()->mutable_source()->clear_path();
       converted.mutable_disk()->mutable_source()->clear_mount();
 
-      // NOTE: We keep the source ID and metadata if it is a
-      // pre-existing volume, which has no profile.
+      // We only clear the the volume ID and metadata if the destroyed volume is
+      // not a pre-existing volume.
       if (resource.disk().source().has_profile()) {
         converted.mutable_disk()->mutable_source()->clear_id();
         converted.mutable_disk()->mutable_source()->clear_metadata();
+
+        if (!profileInfos.contains(resource.disk().source().profile())) {
+          // The destroyed volume is converted into an empty resource to prevent
+          // the freed disk from being sent out with a disappeared profile.
+          converted.mutable_scalar()->set_value(0);
+
+          // Since the profile disappears, The freed disk might be claimed by
+          // other appeared profiles. If there is an ongoing reconciliation, it
+          // is waiting for this operation to finish and will recover the freed
+          // disk, so no reconciliation should be done here. Otherwise, we
+          // reconcile the storage pools to recover the freed disk.
+          if (!reconciled.isPending()) {
+            CHECK(info.has_id());
+
+            LOG(INFO)
+              << "Reconciling storage pools for resource provider " << info.id()
+              << " after the disk with profile '"
+              << resource.disk().source().profile() << "' has been freed";
+
+            // Reconcile the storage pools in `sequence` to wait for any other
+            // pending operation that disallow reconciliation to finish, and set
+            // up `reconciled` to drop incoming operations that disallow
+            // reconciliation until the storage pools are reconciled.
+            reconciled = sequence.add(std::function<Future<Nothing>()>(
+                defer(self(), &Self::reconcileStoragePools)));
+          }
+        }
       }
 
       vector<ResourceConversion> conversions;
@@ -3319,10 +3395,7 @@ Try<Nothing> StorageLocalResourceProviderProcess::updateOperationStatus(
 
   if (error.isSome()) {
     // We only send `UPDATE_STATE` for failed speculative operations.
-    if (operation.info().type() == Offer::Operation::RESERVE ||
-        operation.info().type() == Offer::Operation::UNRESERVE ||
-        operation.info().type() == Offer::Operation::CREATE ||
-        operation.info().type() == Offer::Operation::DESTROY) {
+    if (protobuf::isSpeculativeOperation(operation.info())) {
       resourceVersion = id::UUID::random();
       sendResourceProviderStateUpdate();
     }
@@ -3419,15 +3492,18 @@ void StorageLocalResourceProviderProcess::sendResourceProviderStateUpdate()
     << "' and " << update->operations_size() << " operations to agent "
     << slaveId;
 
-  auto err = [](const ResourceProviderID& id, const string& message) {
+  // NOTE: We terminate the resource provider here if the state cannot be
+  // updated, so that the state is in sync with the agent's view.
+  auto die = [=](const ResourceProviderID& id, const string& message) {
     LOG(ERROR)
       << "Failed to update state for resource provider " << id << ": "
       << message;
+    fatal();
   };
 
   driver->send(evolve(call))
-    .onFailed(std::bind(err, info.id(), lambda::_1))
-    .onDiscarded(std::bind(err, info.id(), "future discarded"));
+    .onFailed(std::bind(die, info.id(), lambda::_1))
+    .onDiscarded(std::bind(die, info.id(), "future discarded"));
 }