You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/02/01 06:20:45 UTC

[09/12] mesos git commit: Added storage local resource provider test for agent with a new ID.

Added storage local resource provider test for agent with a new ID.

Review: https://reviews.apache.org/r/65000/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ab8e0efb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ab8e0efb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ab8e0efb

Branch: refs/heads/master
Commit: ab8e0efb9c2c5c3f6c4e9b234b57eb0ee48eabec
Parents: 53de510
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Wed Jan 31 18:34:18 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Wed Jan 31 22:14:28 2018 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 213 +++++++++++++++++++
 1 file changed, 213 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ab8e0efb/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index d9e9d5b..e53142b 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -25,6 +25,7 @@
 #include "module/manager.hpp"
 
 #include "slave/container_daemon_process.hpp"
+#include "slave/paths.hpp"
 
 #include "slave/containerizer/fetcher.hpp"
 
@@ -998,6 +999,218 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
 }
 
 
+// This test verifies that if an agent is registered with a new ID,
+// the ID of the resource provider would be changed as well, and any
+// created volume becomes a pre-existing volume.
+TEST_F(StorageLocalResourceProviderTest, ROOT_AgentRegisteredWithNewId)
+{
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing a RAW pre-existing volume after the agent
+  //      is registered with a new ID.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> slaveRecoveredOffers;
+
+  Sequence offers;
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  // Before the agent fails over, we are interested in any storage pool
+  // or created volume which has a "volume-default" profile.
+  auto hasSourceType = [](
+      const Resource& r,
+      const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == "volume-default" &&
+      r.disk().source().type() == type;
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  // Create a volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      acceptFilters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> createdVolume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+      createdVolume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(createdVolume);
+  ASSERT_TRUE(createdVolume->has_provider_id());
+  ASSERT_TRUE(createdVolume->disk().source().has_id());
+  ASSERT_TRUE(createdVolume->disk().source().has_metadata());
+  ASSERT_TRUE(createdVolume->disk().source().has_mount());
+  ASSERT_TRUE(createdVolume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(createdVolume->disk().source().mount().root()));
+
+  // Check if the volume is actually created by the test CSI plugin.
+  Option<string> volumePath;
+
+  foreach (const Label& label,
+           createdVolume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  EXPECT_TRUE(os::exists(volumePath.get()));
+
+  // Shut down the agent.
+  EXPECT_CALL(sched, offerRescinded(_, _));
+
+  slave.get()->terminate();
+
+  // Remove the `latest` symlink to register the agent with a new ID.
+  const string metaDir = slave::paths::getMetaRootDir(slaveFlags.work_dir);
+  ASSERT_SOME(os::rm(slave::paths::getLatestSlavePath(metaDir)));
+
+  // A new registration would trigger another `SlaveRegisteredMessage`.
+  slaveRegisteredMessage = FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  // After the agent fails over, any volume created before becomes a
+  // pre-existing volume, which has an ID but no profile.
+  auto isPreExistingVolume = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_id() &&
+      !r.disk().source().has_profile();
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      isPreExistingVolume)))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&slaveRecoveredOffers));
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  AWAIT_READY(slaveRecoveredOffers);
+  ASSERT_FALSE(slaveRecoveredOffers->empty());
+
+  Option<Resource> preExistingVolume;
+
+  foreach (const Resource& resource, slaveRecoveredOffers->at(0).resources()) {
+    if (isPreExistingVolume(resource) &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+      preExistingVolume = resource;
+    }
+  }
+
+  ASSERT_SOME(preExistingVolume);
+  ASSERT_TRUE(preExistingVolume->has_provider_id());
+  ASSERT_NE(createdVolume->provider_id(), preExistingVolume->provider_id());
+  ASSERT_EQ(
+      createdVolume->disk().source().id(),
+      preExistingVolume->disk().source().id());
+}
+
+
 // This test verifies that the storage local resource provider can
 // publish a volume required by a task, then destroy the published
 // volume after the task finishes.