You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by be...@apache.org on 2019/08/30 11:05:20 UTC
[mesos] branch master updated: Gracefully handled duplicated
volumes from non-conforming CSI plugins.
This is an automated email from the ASF dual-hosted git repository.
bennoe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/mesos.git
The following commit(s) were added to refs/heads/master by this push:
new 43b86da Gracefully handled duplicated volumes from non-conforming CSI plugins.
43b86da is described below
commit 43b86da531a889b1c4b1d7ca6acb2eb924ea01e1
Author: Chun-Hung Hsiao <ch...@apache.org>
AuthorDate: Fri Aug 30 13:04:22 2019 +0200
Gracefully handled duplicated volumes from non-conforming CSI plugins.
If the SLRP uses a plugin that does not conform to the CSI spec and
reports duplicated volumes, the duplicate would be removed.
Review: https://reviews.apache.org/r/71414/
---
src/resource_provider/storage/provider.cpp | 78 +++++++++------
.../storage_local_resource_provider_tests.cpp | 109 +++++++++++++++++++++
2 files changed, 159 insertions(+), 28 deletions(-)
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index f180af8..0a8dc26 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -673,8 +673,10 @@ Future<Nothing> StorageLocalResourceProviderProcess::recover()
}
}
- LOG(INFO) << "Finished recovery for resource provider with type '"
- << info.type() << "' and name '" << info.name() << "'";
+ LOG(INFO)
+ << "Recovered resources '" << totalResources << "' and "
+ << operations.size() << " operations for resource provider with type '"
+ << info.type() << "' and name '" << info.name() << "'";
state = DISCONNECTED;
@@ -1081,45 +1083,73 @@ StorageLocalResourceProviderProcess::getExistingVolumes()
return volumeManager->listVolumes()
.then(defer(self(), [=](const vector<VolumeInfo>& volumeInfos) {
+ // If a volume is duplicated or a volume context has been changed by a
+ // non-conforming CSI plugin, we need to construct a resources conversion
+ // to remove the duplicate and update the metadata, so we maintain the
+ // resources to be removed and those to be added here.
+ Resources toRemove;
+ Resources toAdd;
+
// Since we only support "exclusive" (MOUNT or BLOCK) disks, there should
// be only one checkpointed resource for each volume ID.
hashmap<string, Resource> checkpointedMap;
foreach (const Resource& resource, totalResources) {
if (resource.disk().source().has_id()) {
- CHECK(!checkpointedMap.contains(resource.disk().source().id()));
- checkpointedMap.put(resource.disk().source().id(), resource);
+ // If the checkpointed resources contain duplicated volumes because of
+ // a non-conforming CSI plugin, remove the duplicate.
+ if (checkpointedMap.contains(resource.disk().source().id())) {
+ LOG(WARNING) << "Removing duplicated volume '" << resource
+ << "' from the total resources";
+
+ toRemove += resource;
+ } else {
+ checkpointedMap.put(resource.disk().source().id(), resource);
+ }
}
}
// The "discovered" resources consist of RAW disk resources, one for each
// volume reported by the CSI plugin.
Resources discovered;
-
- // If any volume context has been changed by a non-conforming CSI plugin,
- // we need to construct a resources conversion to reflect the
- // corresponding metadata changes, so we maintain the resources to be
- // removed and those to be added here.
- Resources metadataToRemove;
- Resources metadataToAdd;
+ hashset<string> discoveredVolumeIds;
foreach (const VolumeInfo& volumeInfo, volumeInfos) {
- Option<string> profile;
- Option<Labels> metadata = volumeInfo.context.empty()
+ const Option<string> profile =
+ checkpointedMap.contains(volumeInfo.id) &&
+ checkpointedMap.at(volumeInfo.id).disk().source().has_profile()
+ ? checkpointedMap.at(volumeInfo.id).disk().source().profile()
+ : Option<string>::none();
+
+ const Option<Labels> metadata = volumeInfo.context.empty()
? Option<Labels>::none()
: convertStringMapToLabels(volumeInfo.context);
+ const Resource resource = createRawDiskResource(
+ info,
+ volumeInfo.capacity,
+ profile,
+ vendor,
+ volumeInfo.id,
+ metadata);
+
+ if (discoveredVolumeIds.contains(volumeInfo.id)) {
+ LOG(WARNING) << "Dropping duplicated volume '" << resource
+ << "' from the discovered resources";
+
+ continue;
+ }
+
+ discovered += resource;
+ discoveredVolumeIds.insert(volumeInfo.id);
+
if (checkpointedMap.contains(volumeInfo.id)) {
const Resource& resource = checkpointedMap.at(volumeInfo.id);
- if (resource.disk().source().has_profile()) {
- profile = resource.disk().source().profile();
- }
-
// If the volume context has been changed by a non-conforming CSI
// plugin, the changes will be reflected in a resource conversion.
if (resource.disk().source().metadata() !=
metadata.getOrElse(Labels())) {
- metadataToRemove += resource;
+ toRemove += resource;
Resource changed = resource;
if (metadata.isSome()) {
@@ -1129,21 +1159,13 @@ StorageLocalResourceProviderProcess::getExistingVolumes()
changed.mutable_disk()->mutable_source()->clear_metadata();
}
- metadataToAdd += changed;
+ toAdd += changed;
}
}
-
- discovered += createRawDiskResource(
- info,
- volumeInfo.capacity,
- profile,
- vendor,
- volumeInfo.id,
- metadata);
}
ResourceConversion metadataConversion(
- std::move(metadataToRemove), std::move(metadataToAdd));
+ std::move(toRemove), std::move(toAdd));
Resources checkpointed = CHECK_NOTERROR(
totalResources.filter([](const Resource& resource) {
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 05daf2a..089aa97 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -1374,6 +1374,115 @@ TEST_P(StorageLocalResourceProviderTest, RecoverDiskWithChangedMetadata)
}
+// This test verifies that the storage local resource provider can properly
+// handle duplicated volumes. This is a regression test for MESOS-9965.
+TEST_P(StorageLocalResourceProviderTest, RecoverDuplicatedVolumes)
+{
+ const string mockCsiEndpoint =
+ "unix://" + path::join(sandbox.get(), "mock_csi.sock");
+
+ MockCSIPlugin plugin;
+ ASSERT_SOME(plugin.startup(mockCsiEndpoint));
+
+ setupResourceProviderConfig(Bytes(0), None(), mockCsiEndpoint);
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ if (GetParam() == csi::v0::API_VERSION) {
+ EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v0::ListVolumesResponse*>()))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v0::ListVolumesRequest* request,
+ csi::v0::ListVolumesResponse* response) {
+ csi::v0::Volume volume;
+ volume.set_capacity_bytes(Gigabytes(2).bytes());
+ volume.set_id("volume1");
+
+ // Report duplicated volumes.
+ *response->add_entries()->mutable_volume() = volume;
+ *response->add_entries()->mutable_volume() = volume;
+
+ return grpc::Status::OK;
+ }));
+ } else {
+ EXPECT_CALL(plugin, ListVolumes(_, _, A<csi::v1::ListVolumesResponse*>()))
+ .WillRepeatedly(Invoke([&](
+ grpc::ServerContext* context,
+ const csi::v1::ListVolumesRequest* request,
+ csi::v1::ListVolumesResponse* response) {
+ csi::v1::Volume volume;
+ volume.set_capacity_bytes(Gigabytes(2).bytes());
+ volume.set_volume_id("volume1");
+
+ // Report duplicated volumes.
+ *response->add_entries()->mutable_volume() = volume;
+ *response->add_entries()->mutable_volume() = volume;
+
+ return grpc::Status::OK;
+ }));
+ }
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Register a framework to exercise operations.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage/role");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // We use the following filter to filter offers that do not have wanted
+ // resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
+
+ // Decline unwanted offers. The master can send such offers before the
+ // resource provider receives profile updates.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers(declineFilters));
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ &Resources::hasResourceProvider)))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ // Restart the agent.
+ EXPECT_CALL(sched, offerRescinded(_, _));
+
+ slave.get()->terminate();
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ &Resources::hasResourceProvider)))
+ .WillOnce(FutureArg<1>(&offers));
+
+ slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Wait for an offer to verify that the resource provider comes back.
+ AWAIT_READY(offers);
+ ASSERT_EQ(1u, offers->size());
+
+ Offer offer = offers->at(0);
+
+ Resources resourceProviderResources =
+ Resources(offer.resources()).filter(&Resources::hasResourceProvider);
+
+ EXPECT_SOME_EQ(Gigabytes(2), resourceProviderResources.disk());
+}
+
+
// This test verifies that a framework cannot create a volume during and after
// the profile disappears, and destroying a volume with a stale profile will
// recover the freed disk with another appeared profile.