You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/06/01 01:32:29 UTC
[03/12] mesos git commit: Added a unit test for CSI plugin RPC
metrics.
Added a unit test for CSI plugin RPC metrics.
This patch adds the `ROOT_CsiPluginRpcMetrics` test that issues a
`CREATE_VOLUME` followed by a `DESTROY_VOLUME`, which would fail due to
an out-of-band deletion of the actual volume.
Review: https://reviews.apache.org/r/67256
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1a1f0bab
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1a1f0bab
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1a1f0bab
Branch: refs/heads/master
Commit: 1a1f0bab2fde34095c643cefdb24d700441048d0
Parents: 15fc86d
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Tue May 22 14:47:03 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 31 18:29:56 2018 -0700
----------------------------------------------------------------------
.../storage_local_resource_provider_tests.cpp | 293 +++++++++++++++++++
1 file changed, 293 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/1a1f0bab/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 9bd8558..17df704 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -3116,6 +3116,299 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_OperationStateMetrics)
}
+// This test verifies that storage local resource provider properly
+// reports metrics related to RPCs to CSI plugins.
+// TODO(chhsiao): Currently there is no way to test the `pending` and
+// `cancelled` metrics for RPCs since we have no control over the completion of
+// an operation. Once we support out-of-band CSI plugins through domain sockets,
+// we could test these metrics against a mock CSI plugin.
+TEST_F(StorageLocalResourceProviderTest, ROOT_CsiPluginRpcMetrics)
+{
+ loadUriDiskProfileAdaptorModule();
+
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileMapping();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
+
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Register a framework to exercise operations.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // The framework is expected to see the following offers in sequence:
+ // 1. One containing a RAW disk resource before `CREATE_VOLUME`.
+ // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+ // 3. One containing the same MOUNT disk resource after a failed
+ // `DESTROY_VOLUME`.
+ //
+ // We set up the expectations for these offers as the test progresses.
+ Future<vector<Offer>> rawDiskOffers;
+ Future<vector<Offer>> volumeCreatedOffers;
+ Future<vector<Offer>> operationFailedOffers;
+
+ Sequence offers;
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
+
+ // Decline offers that contain only the agent's default resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers(declineFilters));
+
+ // We are only interested in any storage pool or created volume which
+ // has a "volume-default" profile.
+ auto hasSourceType = [](
+ const Resource& r,
+ const Resource::DiskInfo::Source::Type& type) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().has_profile() &&
+ r.disk().source().profile() == "volume-default" &&
+ r.disk().source().type() == type;
+ };
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+ driver.start();
+
+ AWAIT_READY(rawDiskOffers);
+ ASSERT_FALSE(rawDiskOffers->empty());
+
+ Option<Resource> source;
+
+ foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+ if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+ source = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(source);
+
+ JSON::Object snapshot = Metrics();
+
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.Probe/successes")));
+ EXPECT_EQ(1, snapshot.values.at( metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.Probe/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes")));
+ EXPECT_EQ(2, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length)
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length)
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes")));
+ EXPECT_EQ(2, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ EXPECT_EQ(0, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+ EXPECT_EQ(0, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes")));
+
+ // Create a volume.
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+ // We use the following filter so that the resources will not be
+ // filtered for 5 seconds (the default).
+ Filters acceptFilters;
+ acceptFilters.set_refuse_seconds(0);
+
+ driver.acceptOffers(
+ {rawDiskOffers->at(0).id()},
+ {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+ acceptFilters);
+
+ AWAIT_READY(volumeCreatedOffers);
+ ASSERT_FALSE(volumeCreatedOffers->empty());
+
+ Option<Resource> volume;
+
+ foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+ if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+ volume = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(volume);
+ ASSERT_TRUE(volume->disk().source().has_id());
+ ASSERT_TRUE(volume->disk().source().has_metadata());
+ ASSERT_TRUE(volume->disk().source().has_mount());
+ ASSERT_TRUE(volume->disk().source().mount().has_root());
+ EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+ snapshot = Metrics();
+
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.Probe/successes")));
+ EXPECT_EQ(1, snapshot.values.at( metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.Probe/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes")));
+ EXPECT_EQ(2, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length)
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length)
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes")));
+ EXPECT_EQ(2, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+ EXPECT_EQ(0, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes")));
+
+ // Remove the volume out of band to fail `DESTROY_VOLUME`.
+ Option<string> volumePath;
+
+ foreach (const Label& label, volume->disk().source().metadata().labels()) {
+ if (label.key() == "path") {
+ volumePath = label.value();
+ break;
+ }
+ }
+
+ ASSERT_SOME(volumePath);
+ ASSERT_SOME(os::rmdir(volumePath.get()));
+
+ // Destroy the created volume, which will fail.
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get())))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&operationFailedOffers))
+ .WillRepeatedly(DeclineOffers(declineFilters)); // Decline further offers.
+
+ driver.acceptOffers(
+ {volumeCreatedOffers->at(0).id()},
+ {DESTROY_VOLUME(volume.get())},
+ acceptFilters);
+
+ AWAIT_READY(operationFailedOffers);
+ ASSERT_FALSE(operationFailedOffers->empty());
+
+ snapshot = Metrics();
+
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.Probe/successes")));
+ EXPECT_EQ(1, snapshot.values.at( metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.Probe/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes")));
+ EXPECT_EQ(2, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginInfo/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Identity.GetPluginCapabilities/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length)
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ControllerGetCapabilities/successes"))); // NOLINT(whitespace/line_length)
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.ListVolumes/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes")));
+ EXPECT_EQ(2, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.GetCapacity/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.CreateVolume/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Controller.DeleteVolume/errors")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetCapabilities/successes")));
+ ASSERT_NE(0u, snapshot.values.count(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes")));
+ EXPECT_EQ(1, snapshot.values.at(metricName(
+ "csi_plugin/rpcs/csi.v0.Node.NodeGetId/successes")));
+}
+
+
// Master reconciles operations that are missing from a reregistering slave.
// In this case, the `ApplyOperationMessage` is dropped, so the resource
// provider should send OPERATION_DROPPED. Operations on agent default