You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2019/08/30 11:05:20 UTC

[mesos] branch master updated: Gracefully handled duplicated volumes from non-conforming CSI plugins.

This is an automated email from the ASF dual-hosted git repository.

bennoe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git


The following commit(s) were added to refs/heads/master by this push:
     new 43b86da  Gracefully handled duplicated volumes from non-conforming CSI plugins.
43b86da is described below

commit 43b86da531a889b1c4b1d7ca6acb2eb924ea01e1
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Fri Aug 30 13:04:22 2019 +0200

    Gracefully handled duplicated volumes from non-conforming CSI plugins.
    
    If the SLRP uses a plugin that does not conform to the CSI spec and
    reports duplicated volumes, the duplicate would be removed.
    
    Review: https://reviews.apache.org/r/71414/
---
 src/resource_provider/storage/provider.cpp         |  78 +++++++++------
 .../storage_local_resource_provider_tests.cpp      | 109 +++++++++++++++++++++
 2 files changed, 159 insertions(+), 28 deletions(-)

diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f180af8..0a8dc26 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -673,8 +673,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
         }
       }
 
-      LOG(INFO) << "Finished recovery for resource provider with type '"
-                << info.type() << "' and name '" << info.name() << "'";
+      LOG(INFO)
+        << "Recovered resources '" << totalResources << "' and "
+        << operations.size() << " operations for resource provider with type '"
+        << info.type() << "' and name '" << info.name() << "'";
 
       state = DISCONNECTED;
 
@@ -1081,45 +1083,73 @@ StorageLocalResourceProviderProcess::getExistingVolumes()
 
   return volumeManager->listVolumes()
     .then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
+      // If a volume is duplicated or a volume context has been changed by a
+      // non-conforming CSI plugin, we need to construct a resources conversion
+      // to remove the duplicate and update the metadata, so we maintain the
+      // resources to be removed and those to be added here.
+      Resources toRemove;
+      Resources toAdd;
+
       // Since we only support "exclusive" (MOUNT or BLOCK) disks, there should
       // be only one checkpointed resource for each volume ID.
       hashmap<string, Resource> checkpointedMap;
       foreach (const Resource& resource, totalResources) {
         if (resource.disk().source().has_id()) {
-          CHECK(!checkpointedMap.contains(resource.disk().source().id()));
-          checkpointedMap.put(resource.disk().source().id(), resource);
+          // If the checkpointed resources contain duplicated volumes because of
+          // a non-conforming CSI plugin, remove the duplicate.
+          if (checkpointedMap.contains(resource.disk().source().id())) {
+            LOG(WARNING) << "Removing duplicated volume '" << resource
+                         << "' from the total resources";
+
+            toRemove += resource;
+          } else {
+            checkpointedMap.put(resource.disk().source().id(), resource);
+          }
         }
       }
 
       // The "discovered" resources consist of RAW disk resources, one for each
       // volume reported by the CSI plugin.
       Resources discovered;
-
-      // If any volume context has been changed by a non-conforming CSI plugin,
-      // we need to construct a resources conversion to reflect the
-      // corresponding metadata changes, so we maintain the resources to be
-      // removed and those to be added here.
-      Resources metadataToRemove;
-      Resources metadataToAdd;
+      hashset<string> discoveredVolumeIds;
 
       foreach (const VolumeInfo& volumeInfo, volumeInfos) {
-        Option<string> profile;
-        Option<Labels> metadata = volumeInfo.context.empty()
+        const Option<string> profile =
+          checkpointedMap.contains(volumeInfo.id) &&
+          checkpointedMap.at(volumeInfo.id).disk().source().has_profile()
+            ? checkpointedMap.at(volumeInfo.id).disk().source().profile()
+            : Option<string>::none();
+
+        const Option<Labels> metadata = volumeInfo.context.empty()
           ? Option<Labels>::none()
           : convertStringMapToLabels(volumeInfo.context);
 
+        const Resource resource = createRawDiskResource(
+            info,
+            volumeInfo.capacity,
+            profile,
+            vendor,
+            volumeInfo.id,
+            metadata);
+
+        if (discoveredVolumeIds.contains(volumeInfo.id)) {
+          LOG(WARNING) << "Dropping duplicated volume '" << resource
+                       << "' from the discovered resources";
+
+          continue;
+        }
+
+        discovered += resource;
+        discoveredVolumeIds.insert(volumeInfo.id);
+
         if (checkpointedMap.contains(volumeInfo.id)) {
           const Resource& resource = checkpointedMap.at(volumeInfo.id);
 
-          if (resource.disk().source().has_profile()) {
-            profile = resource.disk().source().profile();
-          }
-
           // If the volume context has been changed by a non-conforming CSI
           // plugin, the changes will be reflected in a resource conversion.
           if (resource.disk().source().metadata() !=
               metadata.getOrElse(Labels())) {
-            metadataToRemove += resource;
+            toRemove += resource;
 
             Resource changed = resource;
             if (metadata.isSome()) {
@@ -1129,21 +1159,13 @@ StorageLocalResourceProviderProcess::getExistingVolumes()
               changed.mutable_disk()->mutable_source()->clear_metadata();
             }
 
-            metadataToAdd += changed;
+            toAdd += changed;
           }
         }
-
-        discovered += createRawDiskResource(
-            info,
-            volumeInfo.capacity,
-            profile,
-            vendor,
-            volumeInfo.id,
-            metadata);
       }
 
       ResourceConversion metadataConversion(
-          std::move(metadataToRemove), std::move(metadataToAdd));
+          std::move(toRemove), std::move(toAdd));
 
       Resources checkpointed = CHECK_NOTERROR(
           totalResources.filter([](const Resource& resource) {
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 05daf2a..089aa97 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -1374,6 +1374,115 @@ TEST_P(StorageLocalResourceProviderTest, RecoverDiskWithChangedMetadata)
 }
 
 
+// This test verifies that the storage local resource provider can properly
+// handle duplicated volumes. This is a regression test for MESOS-9965.
+TEST_P(StorageLocalResourceProviderTest, RecoverDuplicatedVolumes)
+{
+  const string mockCsiEndpoint =
+    "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+  MockCSIPlugin plugin;
+  ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+  setupResourceProviderConfig(Bytes(0), None(), mockCsiEndpoint);
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  if (GetParam() == csi::v0::API_VERSION) {
+    EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>()))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v0::ListVolumesRequest* request,
+          csi::v0::ListVolumesResponse* response) {
+        csi::v0::Volume volume;
+        volume.set_capacity_bytes(Gigabytes(2).bytes());
+        volume.set_id("volume1");
+
+        // Report duplicated volumes.
+        *response->add_entries()->mutable_volume() = volume;
+        *response->add_entries()->mutable_volume() = volume;
+
+        return grpc::Status::OK;
+      }));
+  } else {
+    EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>()))
+      .WillRepeatedly(Invoke([&](
+          grpc::ServerContext* context,
+          const csi::v1::ListVolumesRequest* request,
+          csi::v1::ListVolumesResponse* response) {
+        csi::v1::Volume volume;
+        volume.set_capacity_bytes(Gigabytes(2).bytes());
+        volume.set_volume_id("volume1");
+
+        // Report duplicated volumes.
+        *response->add_entries()->mutable_volume() = volume;
+        *response->add_entries()->mutable_volume() = volume;
+
+        return grpc::Status::OK;
+      }));
+  }
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage/role");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // We use the following filter to filter offers that do not have wanted
+  // resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline unwanted offers. The master can send such offers before the
+  // resource provider receives profile updates.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      &Resources::hasResourceProvider)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+
+  // Restart the agent.
+  EXPECT_CALL(sched, offerRescinded(_, _));
+
+  slave.get()->terminate();
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      &Resources::hasResourceProvider)))
+    .WillOnce(FutureArg<1>(&offers));
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Wait for an offer to verify that the resource provider comes back.
+  AWAIT_READY(offers);
+  ASSERT_EQ(1u, offers->size());
+
+  Offer offer = offers->at(0);
+
+  Resources resourceProviderResources =
+    Resources(offer.resources()).filter(&Resources::hasResourceProvider);
+
+  EXPECT_SOME_EQ(Gigabytes(2), resourceProviderResources.disk());
+}
+
+
 // This test verifies that a framework cannot create a volume during and after
 // the profile disappears, and destroying a volume with a stale profile will
 // recover the freed disk with another appeared profile.