You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/06/01 01:32:31 UTC

[05/12] mesos git commit: Added a unit test for SLRP operation state metrics.

Added a unit test for SLRP operation state metrics.

This patch adds the `ROOT_OperationStateMetrics` test that issues a
`CREATE_VOLUME` followed by two `DESTROY_VOLUME`s. The first one will
fail due to an out-of-band deletion of the actual volume, and the second
one will fail due to modifying the resource version.

Review: https://reviews.apache.org/r/65666/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/5bdea195
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/5bdea195
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/5bdea195

Branch: refs/heads/master
Commit: 5bdea1951a63b92dacc4b97ca5dd8b2e86467f98
Parents: 70b407d
Author: Chun-Hung Hsiao <ch...@apache.org>
Authored: Thu May 17 17:45:06 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Thu May 31 18:29:56 2018 -0700

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 301 +++++++++++++++++--
 1 file changed, 277 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/5bdea195/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 04a75fc..3a2eec3 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -56,9 +56,9 @@ using mesos::master::detector::StandaloneMasterDetector;
 using process::Clock;
 using process::Future;
 using process::Owned;
+using process::post;
 
 using testing::AtMost;
-using testing::DoAll;
 using testing::Not;
 using testing::Sequence;
 
@@ -265,6 +265,12 @@ public:
     ASSERT_SOME(write);
   }
 
+  string metricName(const string& basename)
+  {
+    return "resource_providers/" + stringify(TEST_SLRP_TYPE) + "." +
+      stringify(TEST_SLRP_NAME) + "/" + basename;
+  }
+
 protected:
   Modules modules;
   vector<string> slaveWorkDirs;
@@ -2783,9 +2789,9 @@ TEST_F(
 }
 
 
-// This test verifies that storage local resource provider metrics are
-// properly reported.
-TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics)
+// This test verifies that storage local resource provider properly
+// reports metrics related to CSI plugin terminations.
+TEST_F(StorageLocalResourceProviderTest, ROOT_PluginTerminationMetrics)
 {
   setupResourceProviderConfig(Gigabytes(4));
 
@@ -2820,20 +2826,16 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics)
 
   AWAIT_READY(pluginConnected);
 
-  const string prefix =
-    "resource_providers/" + stringify(TEST_SLRP_TYPE) +
-    "." + stringify(TEST_SLRP_NAME) + "/";
-
   JSON::Object snapshot = Metrics();
 
-  ASSERT_NE(0u, snapshot.values.count(
-      prefix + "csi_controller_plugin_terminations"));
-  EXPECT_EQ(0, snapshot.values.at(
-      prefix + "csi_controller_plugin_terminations"));
-  ASSERT_NE(0u, snapshot.values.count(
-      prefix + "csi_node_plugin_terminations"));
-  EXPECT_EQ(0, snapshot.values.at(
-      prefix + "csi_node_plugin_terminations"));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_controller_plugin_terminations")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "csi_controller_plugin_terminations")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_node_plugin_terminations")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "csi_node_plugin_terminations")));
 
   // Get the ID of the CSI plugin container.
   Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
@@ -2860,14 +2862,265 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_Metrics)
 
   snapshot = Metrics();
 
-  ASSERT_NE(0u, snapshot.values.count(
-      prefix + "csi_controller_plugin_terminations"));
-  EXPECT_EQ(1, snapshot.values.at(
-      prefix + "csi_controller_plugin_terminations"));
-  ASSERT_NE(0u, snapshot.values.count(
-      prefix + "csi_node_plugin_terminations"));
-  EXPECT_EQ(1, snapshot.values.at(
-      prefix + "csi_node_plugin_terminations"));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_controller_plugin_terminations")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "csi_controller_plugin_terminations")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "csi_node_plugin_terminations")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "csi_node_plugin_terminations")));
+}
+
+
+// This test verifies that storage local resource provider properly
+// reports metrics related to operation states.
+// TODO(chhsiao): Currently there is no way to test the `pending` metric for
+// operations since we have no control over the completion of an operation. Once
+// we support out-of-band CSI plugins through domain sockets, we could test this
+// metric against a mock CSI plugin.
+TEST_F(StorageLocalResourceProviderTest, ROOT_OperationStateMetrics)
+{
+  loadUriDiskProfileAdaptorModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileMapping();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing the same MOUNT disk resource after a failed
+  //      `DESTROY_VOLUME`.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> operationFailedOffers;
+
+  Sequence offers;
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  // We are only interested in any storage pool or created volume which
+  // has a "volume-default" profile.
+  auto hasSourceType = [](
+      const Resource& r,
+      const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == "volume-default" &&
+      r.disk().source().type() == type;
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  JSON::Object snapshot = Metrics();
+
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/create_volume/finished")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "operations/create_volume/finished")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/failed")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "operations/destroy_volume/failed")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/dropped")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "operations/destroy_volume/dropped")));
+
+  // Create a volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      acceptFilters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> volume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+      volume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(volume);
+  ASSERT_TRUE(volume->disk().source().has_id());
+  ASSERT_TRUE(volume->disk().source().has_metadata());
+  ASSERT_TRUE(volume->disk().source().has_mount());
+  ASSERT_TRUE(volume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+  snapshot = Metrics();
+
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/create_volume/finished")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "operations/create_volume/finished")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/failed")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "operations/destroy_volume/failed")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/dropped")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "operations/destroy_volume/dropped")));
+
+  // Remove the volume out of band to fail `DESTROY_VOLUME`.
+  Option<string> volumePath;
+
+  foreach (const Label& label, volume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  ASSERT_SOME(os::rmdir(volumePath.get()));
+
+  // Destroy the created volume, which will fail.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(volume.get())))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&operationFailedOffers))
+    .WillRepeatedly(DeclineOffers(declineFilters)); // Decline further offers.
+
+  driver.acceptOffers(
+      {volumeCreatedOffers->at(0).id()},
+      {DESTROY_VOLUME(volume.get())},
+      acceptFilters);
+
+  AWAIT_READY(operationFailedOffers);
+  ASSERT_FALSE(operationFailedOffers->empty());
+
+  snapshot = Metrics();
+
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/create_volume/finished")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "operations/create_volume/finished")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/failed")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "operations/destroy_volume/failed")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/dropped")));
+  EXPECT_EQ(0, snapshot.values.at(metricName(
+      "operations/destroy_volume/dropped")));
+
+  // Destroy the volume again, which will be dropped this time.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), _, _);
+
+  driver.acceptOffers(
+      {operationFailedOffers->at(0).id()},
+      {DESTROY_VOLUME(volume.get())},
+      acceptFilters);
+
+  AWAIT_READY(applyOperationMessage);
+  ASSERT_TRUE(applyOperationMessage
+    ->resource_version_uuid().has_resource_provider_id());
+
+  // Modify the resource version UUID to drop `DESTROY_VOLUME`.
+  Future<UpdateOperationStatusMessage> operationDroppedStatus =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  ApplyOperationMessage spoofedApplyOperationMessage =
+    applyOperationMessage.get();
+  spoofedApplyOperationMessage.mutable_resource_version_uuid()->mutable_uuid()
+    ->set_value(id::UUID::random().toBytes());
+
+  post(master.get()->pid, slave.get()->pid, spoofedApplyOperationMessage);
+
+  AWAIT_READY(operationDroppedStatus);
+  EXPECT_EQ(OPERATION_DROPPED, operationDroppedStatus->status().state());
+
+  snapshot = Metrics();
+
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/create_volume/finished")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "operations/create_volume/finished")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/failed")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "operations/destroy_volume/failed")));
+  ASSERT_NE(0u, snapshot.values.count(metricName(
+      "operations/destroy_volume/dropped")));
+  EXPECT_EQ(1, snapshot.values.at(metricName(
+      "operations/destroy_volume/dropped")));
 }