You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/19 23:14:36 UTC

[06/12] mesos git commit: Refactored and fixed bugs for SLRP resource reconciliation.

Refactored and fixed bugs for SLRP resource reconciliation.

Review: https://reviews.apache.org/r/64644/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/28bf0891
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/28bf0891
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/28bf0891

Branch: refs/heads/master
Commit: 28bf0891bf0b985153fa129e69fda0b7fd97d456
Parents: b36152d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue Dec 19 11:25:06 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Tue Dec 19 15:14:22 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 261 ++++++++++++------------
 1 file changed, 133 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/28bf0891/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index a103494..8510a31 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -342,6 +342,9 @@ private:
   Future<Nothing> recoverStatusUpdates();
   void doReliableRegistration();
   Future<Nothing> reconcileResourceProviderState();
+  ResourceConversion reconcileResources(
+      const Resources& checkpointed,
+      const Resources& discovered);
 
   // Functions for received events.
   void subscribed(const Event::Subscribed& subscribed);
@@ -358,7 +361,6 @@ private:
 
   Future<Nothing> prepareControllerService();
   Future<Nothing> prepareNodeService();
-  Future<Resources> discoverResources();
   Future<Nothing> controllerPublish(const string& volumeId);
   Future<Nothing> controllerUnpublish(const string& volumeId);
   Future<Nothing> nodePublish(const string& volumeId);
@@ -372,7 +374,8 @@ private:
       const string& volumeId,
       const Option<Labels>& metadata,
       const csi::VolumeCapability& capability);
-  Future<Resources> getCapacities(const hashmap<string, ProfileData>& profiles);
+  Future<Resources> listVolumes();
+  Future<Resources> getCapacities();
 
   Future<Nothing> _applyOfferOperation(const id::UUID& operationUuid);
 
@@ -969,6 +972,8 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
       // We replay all pending operations here, so that if a volume is
       // created or deleted before the last failover, the result will be
       // reflected in the total resources before reconciliation.
+      list<Future<Nothing>> futures;
+
       foreachpair (const id::UUID& uuid,
                    const OfferOperation& operation,
                    offerOperations) {
@@ -982,12 +987,14 @@ Future<Nothing> StorageLocalResourceProviderProcess::recoverStatusUpdates()
             << message;
         };
 
-        _applyOfferOperation(uuid)
+        futures.push_back(_applyOfferOperation(uuid)
           .onFailed(std::bind(err, uuid, lambda::_1))
-          .onDiscarded(std::bind(err, uuid, "future discarded"));
+          .onDiscarded(std::bind(err, uuid, "future discarded")));
       }
 
-      return Nothing();
+      // We await the futures instead of collect them because it is OK
+      // for offer operations to fail.
+      return await(futures).then([] { return Nothing(); });
     }));
 }
 
@@ -1025,69 +1032,77 @@ Future<Nothing>
 StorageLocalResourceProviderProcess::reconcileResourceProviderState()
 {
   return recoverStatusUpdates()
-    .then(defer(self(), &Self::discoverResources))
-    .then(defer(self(), [=](Resources discoveredResources) {
-      // NODE: If a resource in the checkpointed total resources is
-      // missing in the discovered resources, we will still keep it if
-      // it is converted by an offer operation before (i.e., has extra
-      // info other than the default reservations). The reason is that
-      // we want to maintain a consistent view with frameworks, and do
-      // not want to lose any data on persistent volumes due to some
-      // temporarily CSI plugin faults. Other missing resources that are
-      // "unconverted" by any framework will be removed from the total
-      // resources. Then, any newly discovered resource will be reported
-      // under the default reservations.
-
-      Resources result;
-      Resources unconvertedTotal;
-
-      foreach (const Resource& resource, totalResources) {
-        Resource unconverted = createRawDiskResource(
-            info,
-            Bytes(resource.scalar().value(), Bytes::MEGABYTES),
-            resource.disk().source().has_profile()
-              ? resource.disk().source().profile() : Option<string>::none(),
-            resource.disk().source().has_id()
-              ? resource.disk().source().id() : Option<string>::none(),
-            resource.disk().source().has_metadata()
-              ? resource.disk().source().metadata() : Option<Labels>::none());
-        if (discoveredResources.contains(unconverted)) {
-          // The checkponited resource appears in the discovered resources.
-          result += resource;
-          unconvertedTotal += unconverted;
-        } else if (!totalResources.contains(unconverted)) {
-          // The checkpointed resource is missing but converted by a
-          // framework or the operator before, so we keep it.
-          result += resource;
-
-          LOG(WARNING)
-            << "Missing converted resource '" << resource
-            << "'. This might cause further offer operations to fail.";
-        }
-      }
-
-      // NOTE: The states of newly discovered pre-existing volumes will
-      // be added to `volumes` when `CREATE_VOLUME` or `CREATE_BLOCK`
-      // operations are applied.
-      const Resources newResources = discoveredResources - unconvertedTotal;
-      result += newResources;
+    .then(defer(self(), [=] {
+      return collect(list<Future<Resources>>{listVolumes(), getCapacities()})
+        .then(defer(self(), [=](const list<Resources>& discovered) {
+          ResourceConversion conversion = reconcileResources(
+              totalResources,
+              accumulate(discovered.begin(), discovered.end(), Resources()));
+
+          Try<Resources> result = totalResources.apply(conversion);
+          CHECK_SOME(result);
+
+          if (result.get() != totalResources) {
+            totalResources = result.get();
+            checkpointResourceProviderState();
+          }
 
-      LOG(INFO) << "Adding new resources '" << newResources << "'";
+          sendResourceProviderStateUpdate();
+          statusUpdateManager.resume();
 
-      // TODO(chhsiao): Check that all profiles exist.
+          state = READY;
 
-      if (result != totalResources) {
-        totalResources = result;
-        checkpointResourceProviderState();
-      }
+          return Nothing();
+        }));
+    }));
+}
 
-      sendResourceProviderStateUpdate();
-      statusUpdateManager.resume();
 
-      state = READY;
+ResourceConversion StorageLocalResourceProviderProcess::reconcileResources(
+    const Resources& checkpointed,
+    const Resources& discovered)
+{
+  // NOTE: If a resource in the checkpointed resources is missing in the
+  // discovered resources, we will still keep it if it is converted by
+  // an offer operation before (i.e., has extra info other than the
+  // default reservations). The reason is that we want to maintain a
+  // consistent view with frameworks, and do not want to lose any data on
+  // persistent volumes due to some temporarily CSI plugin faults. Other
+  // missing resources that are "unconverted" by any framework will be
+  // removed. Then, any newly discovered resource will be added.
+  Resources toRemove;
+  Resources toAdd = discovered;
+
+  foreach (const Resource& resource, checkpointed) {
+    Resource unconverted = createRawDiskResource(
+        info,
+        Bytes(resource.scalar().value(), Bytes::MEGABYTES),
+        resource.disk().source().has_profile()
+          ? resource.disk().source().profile() : Option<string>::none(),
+        resource.disk().source().has_id()
+          ? resource.disk().source().id() : Option<string>::none(),
+        resource.disk().source().has_metadata()
+          ? resource.disk().source().metadata() : Option<Labels>::none());
+
+    if (toAdd.contains(unconverted)) {
+      // If the remaining of the discovered resources contain the
+      // "unconverted" version of a checkpointed resource, this is not a
+      // new resource.
+      toAdd -= unconverted;
+    } else if (checkpointed.contains(unconverted)) {
+      // If the remaining of the discovered resources does not contain
+      // the "unconverted" version of the checkpointed resource, the
+      // resource is missing. However, if it remains unconverted in the
+      // checkpoint, we can safely remove it from the total resources.
+      toRemove += unconverted;
+    } else {
+      LOG(WARNING)
+        << "Missing converted resource '" << resource
+        << "'. This might cause further offer operations to fail.";
+    }
+  }
 
-      return Nothing();
-    }));
+  return ResourceConversion(std::move(toRemove), std::move(toAdd));
 }
 
 
@@ -1786,67 +1801,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::prepareNodeService()
 }
 
 
-// Returns resources reported by the CSI plugin, which are unreserved
-// raw disk resources without any persistent volume.
-Future<Resources> StorageLocalResourceProviderProcess::discoverResources()
-{
-  // NOTE: This can only be called after `prepareControllerService` and
-  // the resource provider ID has been obtained.
-  CHECK_SOME(controllerCapabilities);
-  CHECK(info.has_id());
-
-  list<Future<Resources>> futures;
-  futures.push_back(getCapacities(profiles));
-
-  if (controllerCapabilities->listVolumes) {
-    futures.push_back(getService(controllerContainerId)
-      .then(defer(self(), [=](csi::Client client) {
-        // TODO(chhsiao): Set the max entries and use a loop to do
-        // mutliple `ListVolumes` calls.
-        csi::ListVolumesRequest request;
-        request.mutable_version()->CopyFrom(csiVersion);
-
-        return client.ListVolumes(request)
-          .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
-            Resources resources;
-
-            // Recover volume profiles from the checkpointed state.
-            hashmap<string, string> volumesToProfiles;
-            foreach (const Resource& resource, totalResources) {
-              if (resource.disk().source().has_id() &&
-                  resource.disk().source().has_profile()) {
-                volumesToProfiles.put(
-                    resource.disk().source().id(),
-                    resource.disk().source().profile());
-              }
-            }
-
-            foreach (const auto& entry, response.entries()) {
-              resources += createRawDiskResource(
-                  info,
-                  Bytes(entry.volume_info().capacity_bytes()),
-                  volumesToProfiles.contains(entry.volume_info().id())
-                    ? volumesToProfiles.at(entry.volume_info().id())
-                    : Option<string>::none(),
-                  entry.volume_info().id(),
-                  entry.volume_info().attributes().empty()
-                    ? Option<Labels>::none()
-                    : convertStringMapToLabels(
-                          entry.volume_info().attributes()));
-            }
-
-            return resources;
-          }));
-      })));
-  }
-
-  return collect(futures)
-    .then(defer(self(), [=](const list<Resources>& resources) {
-      return accumulate(resources.begin(), resources.end(), Resources());
-    }));
-}
-
-
 Future<Nothing> StorageLocalResourceProviderProcess::controllerPublish(
     const string& volumeId)
 {
@@ -2284,17 +2238,68 @@ Future<string> StorageLocalResourceProviderProcess::validateCapability(
 }
 
 
-// Returns RAW disk resources for specified profiles.
-Future<Resources> StorageLocalResourceProviderProcess::getCapacities(
-    const hashmap<string, ProfileData>& profiles)
+Future<Resources> StorageLocalResourceProviderProcess::listVolumes()
+{
+  // NOTE: This can only be called after `prepareControllerService` and
+  // the resource provider ID has been obtained.
+  CHECK_SOME(controllerCapabilities);
+  CHECK(info.has_id());
+
+  // This is only used for reconciliation so no failure is returned.
+  if (!controllerCapabilities->listVolumes) {
+    return Resources();
+  }
+
+  return getService(controllerContainerId)
+    .then(defer(self(), [=](csi::Client client) {
+      // TODO(chhsiao): Set the max entries and use a loop to do
+      // mutliple `ListVolumes` calls.
+      csi::ListVolumesRequest request;
+      request.mutable_version()->CopyFrom(csiVersion);
+
+      return client.ListVolumes(request)
+        .then(defer(self(), [=](const csi::ListVolumesResponse& response) {
+          Resources resources;
+
+          // Recover volume profiles from the checkpointed state.
+          hashmap<string, string> volumesToProfiles;
+          foreach (const Resource& resource, totalResources) {
+            if (resource.disk().source().has_id() &&
+                resource.disk().source().has_profile()) {
+              volumesToProfiles.put(
+                  resource.disk().source().id(),
+                  resource.disk().source().profile());
+            }
+          }
+
+          foreach (const auto& entry, response.entries()) {
+            resources += createRawDiskResource(
+                info,
+                Bytes(entry.volume_info().capacity_bytes()),
+                volumesToProfiles.contains(entry.volume_info().id())
+                  ? volumesToProfiles.at(entry.volume_info().id())
+                  : Option<string>::none(),
+                entry.volume_info().id(),
+                entry.volume_info().attributes().empty()
+                  ? Option<Labels>::none()
+                  : convertStringMapToLabels(
+                        entry.volume_info().attributes()));
+          }
+
+          return resources;
+        }));
+    }));
+}
+
+
+Future<Resources> StorageLocalResourceProviderProcess::getCapacities()
 {
   // NOTE: This can only be called after `prepareControllerService` and
   // the resource provider ID has been obtained.
   CHECK_SOME(controllerCapabilities);
   CHECK(info.has_id());
 
-  // We do not return a failure because this is always called when a
-  // profile is added or a `CreateVolume` CSI call is made.
+  // This is only used for reconciliation so no failure is returned.
   if (!controllerCapabilities->getCapacity) {
     return Resources();
   }