You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/12/09 02:54:52 UTC

[5/7] mesos git commit: SLRP reconciliation only preserves missing resources that are converted.

SLRP reconciliation only preserves missing resources that are converted.

When SLRP's checkpointed total resources is inconsistent with CSI
plugin's reported resources, we still want to keep the missing resources
that have been converted through an offer operation before, such that
we can have an consistent view with the frameworks consuming the
resource offers. It is safe to remove unconverted missing resources from
SLRP's checkpointed total.

Review: https://reviews.apache.org/r/64468/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ec4560e4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ec4560e4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ec4560e4

Branch: refs/heads/master
Commit: ec4560e412de6c2244f067520dd9f5dad3d19e49
Parents: 4c5c7ce
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Dec 8 18:12:20 2017 -0800
Committer: Jie Yu <yu...@gmail.com>
Committed: Fri Dec 8 18:54:43 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/storage/provider.cpp | 55 +++++++++++++++++--------
 1 file changed, 38 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ec4560e4/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index baa3a36..6013bfe 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -238,17 +238,20 @@ static inline http::Headers getAuthHeader(const Option<string>& authToken)
 
 
 static inline Resource createRawDiskResource(
-    const ResourceProviderID& resourceProviderId,
+    const ResourceProviderInfo& info,
     double capacity,
     const Option<string>& profile,
     const Option<string>& id = None(),
     const Option<Labels>& metadata = None())
 {
+  CHECK(info.has_id());
+
   Resource resource;
   resource.set_name("disk");
   resource.set_type(Value::SCALAR);
   resource.mutable_scalar()->set_value(capacity);
-  resource.mutable_provider_id()->CopyFrom(resourceProviderId),
+  resource.mutable_provider_id()->CopyFrom(info.id()),
+  resource.mutable_reservations()->CopyFrom(info.default_reservations());
   resource.mutable_disk()->mutable_source()
     ->set_type(Resource::DiskInfo::Source::RAW);
 
@@ -870,17 +873,24 @@ void StorageLocalResourceProviderProcess::doReliableRegistration()
 Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
 {
   return importResources()
-    .then(defer(self(), [=](Resources imported) {
-      // NOTE: We do not support decreasing the total resources for now.
-      // Any resource in the checkpointed state will be reported to the
-      // resource provider manager, even if it is missing in the
-      // imported resources from the plugin. Additional resources from
-      // the plugin will be reported with the default reservation.
-
-      Resources stripped;
+    .then(defer(self(), [=](Resources importedResources) {
+      // NODE: If a resource in the checkpointed total resources is
+      // missing in the imported resources, we will still keep it if it
+      // is converted by an offer operation before (i.e., has extra info
+      // other than the default reservations). The reason is that we
+      // want to maintain a consistent view with frameworks, and do not
+      // want to lose any data on persistent volumes due to some
+      // temporarily CSI plugin faults. Other missing resources that are
+      // "unconverted" by any framework will be removed from the total
+      // resources. Then, any new imported resource will be reported
+      // under the default reservations.
+
+      Resources result;
+      Resources unconvertedTotal;
+
       foreach (const Resource& resource, totalResources) {
-        stripped += createRawDiskResource(
-            resource.provider_id(),
+        Resource unconverted = createRawDiskResource(
+            info,
             resource.scalar().value(),
             resource.disk().source().has_profile()
               ? resource.disk().source().profile() : Option<string>::none(),
@@ -888,10 +898,22 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
               ? resource.disk().source().id() : Option<string>::none(),
             resource.disk().source().has_metadata()
               ? resource.disk().source().metadata() : Option<Labels>::none());
+        if (importedResources.contains(unconverted)) {
+          // The checkponited resource appears in the imported resources.
+          result += resource;
+          unconvertedTotal += unconverted;
+        } else if (!totalResources.contains(unconverted)) {
+          // The checkpointed resource is missing but converted by a
+          // framework or the operator before, so we keep it.
+          result += resource;
+
+          LOG(WARNING)
+            << "Missing converted resource '" << resource
+            << "'. This might cause further offer operations to fail.";
+        }
       }
 
-      Resources result = totalResources;
-      foreach (Resource resource, imported - stripped) {
+      foreach (Resource resource, importedResources - unconvertedTotal) {
         if (resource.disk().source().has_id() &&
             !volumes.contains(resource.disk().source().id())) {
           csi::state::VolumeState volumeState;
@@ -911,7 +933,6 @@ Future<Nothing> StorageLocalResourceProviderProcess::reconcile()
           checkpointVolumeState(resource.disk().source().id());
         }
 
-        resource.mutable_reservations()->CopyFrom(info.default_reservations());
         result += resource;
 
         LOG(INFO) << "Adding new resource '" << resource << "'";
@@ -1547,7 +1568,7 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources()
 
             foreach (const auto& entry, response.entries()) {
               resources += createRawDiskResource(
-                  info.id(),
+                  info,
                   entry.volume_info().capacity_bytes(),
                   volumesToProfiles.contains(entry.volume_info().id())
                     ? volumesToProfiles.at(entry.volume_info().id())
@@ -1621,7 +1642,7 @@ Future<Resources> StorageLocalResourceProviderProcess::importResources()
                   }
 
                   return createRawDiskResource(
-                      info.id(),
+                      info,
                       response.available_capacity(),
                       profile.empty() ? Option<string>::none() : profile);
               }));