You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/01/20 00:44:11 UTC

[1/7] mesos git commit: Added SLRP unit tests for profile updates and corner cases.

Repository: mesos
Updated Branches:
  refs/heads/master ddde32525 -> 336e93219


Added SLRP unit tests for profile updates and corner cases.

This patch adds the following storage local resource provider tests:

`NoResource`: RP updates its state with no resources, and can recover
from a checkpointed state that contains no resources.

`ZeroSizedDisk`: CSI plugin reports a pre-existing volume with
zero capacity.

`SmallDisk`: CSI plugin reports a storage pool and a pre-existing volume
with sizes < 1MB.

`NewProfile`: A new profile is added after RP updates its state.

Review: https://reviews.apache.org/r/64992/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4924443f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4924443f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4924443f

Branch: refs/heads/master
Commit: 4924443fc6d1b44c33e3403564746bd964e18703
Parents: ddde325
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:22 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:48:43 2018 -0800

----------------------------------------------------------------------
 src/examples/test_csi_plugin.cpp                |   9 +-
 .../storage_local_resource_provider_tests.cpp   | 530 +++++++++++++++++--
 2 files changed, 487 insertions(+), 52 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/4924443f/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index f6b2c98..0f65c40 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -588,7 +588,12 @@ Status TestCSIPlugin::GetCapacity(
 
   foreach (const csi::VolumeCapability& capability,
            request->volume_capabilities()) {
-    if (!capability.has_mount()) {
+    // We report zero capacity for any capability other than the
+    // default-constructed `MountVolume` capability since this plugin
+    // does not support any filesystem types and mount flags.
+    if (!capability.has_mount() ||
+        !capability.mount().fs_type().empty() ||
+        !capability.mount().mount_flags().empty()) {
       response->set_available_capacity(0);
 
       return Status::OK;
@@ -925,8 +930,6 @@ int main(int argc, char** argv)
         Try<Bytes> capacity = Bytes::parse(pair[1]);
         if (capacity.isError()) {
           error = capacity.error();
-        } else if (capacity.get() == 0) {
-          error = "Volume capacity cannot be zero";
         } else {
           volumes.put(pair[0], capacity.get());
         }

http://git-wip-us.apache.org/repos/asf/mesos/blob/4924443f/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index bbfe95e..d98b914 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -14,6 +14,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
+#include <process/clock.hpp>
 #include <process/future.hpp>
 #include <process/gtest.hpp>
 #include <process/gmock.hpp>
@@ -32,6 +33,9 @@ using std::vector;
 
 using mesos::master::detector::MasterDetector;
 
+using mesos::v1::resource_provider::Call;
+
+using process::Clock;
 using process::Future;
 using process::Owned;
 
@@ -54,15 +58,57 @@ public:
   {
     MesosTest::SetUp();
 
-    const string testCsiPluginWorkDir = path::join(sandbox.get(), "test");
+    testCsiPluginWorkDir = path::join(sandbox.get(), "test");
     ASSERT_SOME(os::mkdir(testCsiPluginWorkDir));
 
     resourceProviderConfigDir =
       path::join(sandbox.get(), "resource_provider_configs");
-
     ASSERT_SOME(os::mkdir(resourceProviderConfigDir));
 
-    string testCsiPluginPath =
+    uriDiskProfileConfigPath =
+      path::join(sandbox.get(), "disk_profiles.json");
+  }
+
+  virtual void TearDown()
+  {
+    // Unload modules.
+    foreach (const Modules::Library& library, modules.libraries()) {
+      foreach (const Modules::Library::Module& module, library.modules()) {
+        if (module.has_name()) {
+          ASSERT_SOME(modules::ModuleManager::unload(module.name()));
+        }
+      }
+    }
+
+    MesosTest::TearDown();
+  }
+
+  void loadUriDiskProfileModule()
+  {
+    const string libraryPath = getModulePath("uri_disk_profile");
+
+    Modules::Library* library = modules.add_libraries();
+    library->set_name("uri_disk_profile");
+    library->set_file(libraryPath);
+
+    Modules::Library::Module* module = library->add_modules();
+    module->set_name(URI_DISK_PROFILE_ADAPTOR_NAME);
+
+    Parameter* uri = module->add_parameters();
+    uri->set_key("uri");
+    uri->set_value(uriDiskProfileConfigPath);
+    Parameter* pollInterval = module->add_parameters();
+    pollInterval->set_key("poll_interval");
+    pollInterval->set_value("1secs");
+
+    ASSERT_SOME(modules::ModuleManager::load(modules));
+  }
+
+  void setupResourceProviderConfig(
+      const Bytes& capacity,
+      const Option<string> volumes = None())
+  {
+    const string testCsiPluginPath =
       path::join(tests::flags.build_dir, "src", "test-csi-plugin");
 
     Try<string> resourceProviderConfig = strings::format(
@@ -91,8 +137,8 @@ public:
                     "value": "%s",
                     "arguments": [
                       "%s",
-                      "--available_capacity=2GB",
-                      "--volumes=volume1:1GB;volume2:1GB",
+                      "--available_capacity=%s",
+                      "--volumes=%s",
                       "--work_dir=%s"
                     ]
                   }
@@ -104,6 +150,8 @@ public:
         )~",
         testCsiPluginPath,
         testCsiPluginPath,
+        stringify(capacity),
+        volumes.getOrElse(""),
         testCsiPluginWorkDir);
 
     ASSERT_SOME(resourceProviderConfig);
@@ -111,19 +159,29 @@ public:
     ASSERT_SOME(os::write(
         path::join(resourceProviderConfigDir, "test.json"),
         resourceProviderConfig.get()));
+  }
 
-    uriDiskProfileConfigPath =
-      path::join(sandbox.get(), "disk_profiles.json");
-
+  void setupDiskProfileConfig()
+  {
     Try<Nothing> write = os::write(
         uriDiskProfileConfigPath,
         R"~(
         {
           "profile_matrix": {
-            "default" : {
-              "volume_capabilities" : {
-                "mount" : {},
-                "access_mode" : { "mode" : "SINGLE_NODE_WRITER" }
+            "volume-default": {
+              "volume_capabilities": {
+                "mount": {},
+                "access_mode": {
+                  "mode": "SINGLE_NODE_WRITER"
+                }
+              }
+            },
+            "block-default": {
+              "volume_capabilities": {
+                "block": {},
+                "access_mode": {
+                  "mode": "SINGLE_NODE_WRITER"
+                }
               }
             }
           }
@@ -133,43 +191,404 @@ public:
     ASSERT_SOME(write);
   }
 
-  virtual void TearDown()
-  {
-    // Unload modules.
-    foreach (const Modules::Library& library, modules.libraries()) {
-      foreach (const Modules::Library::Module& module, library.modules()) {
-        if (module.has_name()) {
-          ASSERT_SOME(modules::ModuleManager::unload(module.name()));
-        }
-      }
+protected:
+  Modules modules;
+  string resourceProviderConfigDir;
+  string testCsiPluginWorkDir;
+  string uriDiskProfileConfigPath;
+};
+
+
+// This test verifies that a storage local resource provider can report
+// no resource and recover from this state.
+TEST_F(StorageLocalResourceProviderTest, ROOT_NoResource)
+{
+  Clock::pause();
+
+  setupResourceProviderConfig(Bytes(0));
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration and prevent retry.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+  EXPECT_EQ(
+      0,
+      updateSlave2->resource_providers().providers(0).total_resources_size());
+
+  Clock::pause();
+
+  // Restart the agent.
+  slave.get()->terminate();
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave4 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave3 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration and prevent retry.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave3);
+
+  Clock::resume();
+
+  AWAIT_READY(updateSlave4);
+  ASSERT_TRUE(updateSlave4->has_resource_providers());
+  ASSERT_EQ(1, updateSlave4->resource_providers().providers_size());
+  EXPECT_EQ(
+      0,
+      updateSlave4->resource_providers().providers(0).total_resources_size());
+}
+
+
+// This test verifies that any zero-sized volume reported by a CSI
+// plugin will be ignored by the storage local resource provider.
+TEST_F(StorageLocalResourceProviderTest, ROOT_ZeroSizedDisk)
+{
+  Clock::pause();
+
+  setupResourceProviderConfig(Bytes(0), "volume0:0B");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration and prevent retry.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Option<Resource> volume;
+  foreach (const Resource& resource,
+           updateSlave2->resource_providers().providers(0).total_resources()) {
+    if (Resources::hasResourceProvider(resource)) {
+      volume = resource;
     }
+  }
 
-    MesosTest::TearDown();
+  ASSERT_NONE(volume);
+}
+
+
+// This test verifies that the storage local resource provider can
+// handle disks less than 1MB correctly.
+TEST_F(StorageLocalResourceProviderTest, ROOT_SmallDisk)
+{
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Kilobytes(512), "volume0:512KB");
+  setupDiskProfileConfig();
+
+  master::Flags masterFlags = CreateMasterFlags();
+
+  // Use a small allocation interval to speed up the test. We do this
+  // instead of manipulating the clock to keep the test concise and
+  // avoid waiting for `UpdateSlaveMessage`s and pausing/resuming the
+  // clock multiple times.
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to receive offers.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> rawDisksOffers;
+
+  // We are interested in offers that contains both the storage pool and
+  // the pre-existing volume.
+  auto isStoragePool = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
+      !r.disk().source().has_id() &&
+      r.disk().source().has_profile();
+  };
+
+  auto isPreExistingVolume = [](const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_id() &&
+      !r.disk().source().has_profile();
+  };
+
+  // Since the master may send out offers before the resource provider
+  // reports the storage pool, we decline offers that do not have any
+  // storage pool. This would also decline offers that contain only the
+  // agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      isStoragePool)))
+    .WillOnce(FutureArg<1>(&rawDisksOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDisksOffers);
+  ASSERT_FALSE(rawDisksOffers->empty());
+
+  Option<Resource> storagePool;
+  Option<Resource> preExistingVolume;
+  foreach (const Resource& resource, rawDisksOffers->at(0).resources()) {
+    if (isStoragePool(resource)) {
+      storagePool = resource;
+    } else if (isPreExistingVolume(resource)) {
+      preExistingVolume = resource;
+    }
   }
 
-  void loadUriDiskProfileModule()
-  {
-    string libraryPath = getModulePath("uri_disk_profile");
+  ASSERT_SOME(storagePool);
+  EXPECT_EQ(
+      Kilobytes(512),
+      Bytes(storagePool->scalar().value() * Bytes::MEGABYTES));
 
-    Modules::Library* library = modules.add_libraries();
-    library->set_name("uri_disk_profile");
-    library->set_file(libraryPath);
+  ASSERT_SOME(preExistingVolume);
+  EXPECT_EQ(
+      Kilobytes(512),
+      Bytes(preExistingVolume->scalar().value() * Bytes::MEGABYTES));
+}
 
-    Modules::Library::Module* module = library->add_modules();
-    module->set_name(URI_DISK_PROFILE_ADAPTOR_NAME);
 
-    Parameter* parameter = module->add_parameters();
-    parameter->set_key("uri");
-    parameter->set_value(uriDiskProfileConfigPath);
+// This test verifies that a framework can receive offers having new
+// storage pools from the storage local resource provider due to
+// adding new profiles.
+TEST_F(StorageLocalResourceProviderTest, ROOT_NewProfile)
+{
+  Clock::pause();
 
-    ASSERT_SOME(modules::ModuleManager::load(modules));
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration and prevent retry.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  Clock::resume();
+
+  // No resource should be reported by the resource provider before
+  // adding any profile.
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+  EXPECT_EQ(
+      0,
+      updateSlave2->resource_providers().providers(0).total_resources_size());
+
+  Future<UpdateSlaveMessage> updateSlave3 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // Add new profiles.
+  setupDiskProfileConfig();
+
+  // A new storage pool for profile "volume-default" should be reported
+  // by the resource provider. Still expect no storage pool for
+  // "block-default" since it is not supported by the test CSI plugin.
+  AWAIT_READY(updateSlave3);
+  ASSERT_TRUE(updateSlave3->has_resource_providers());
+  ASSERT_EQ(1, updateSlave3->resource_providers().providers_size());
+  EXPECT_EQ(
+      1,
+      updateSlave3->resource_providers().providers(0).total_resources_size());
+
+  Option<Resource> volumeStoragePool;
+  Option<Resource> blockStoragePool;
+  foreach (const Resource& resource,
+           updateSlave3->resource_providers().providers(0).total_resources()) {
+    if (!resource.has_disk() ||
+        !resource.disk().has_source() ||
+        resource.disk().source().type() != Resource::DiskInfo::Source::RAW ||
+        !resource.disk().source().has_profile() ||
+        resource.disk().source().has_id()) {
+      continue;
+    }
+
+    if (resource.disk().source().profile() == "volume-default") {
+      volumeStoragePool = resource;
+    } else if (resource.disk().source().profile() == "block-default") {
+      blockStoragePool = resource;
+    }
   }
 
-protected:
-  Modules modules;
-  string resourceProviderConfigDir;
-  string uriDiskProfileConfigPath;
-};
+  EXPECT_SOME(volumeStoragePool);
+  EXPECT_NONE(blockStoragePool);
+}
 
 
 // This test verifies that a framework can create then destroy a new
@@ -179,6 +598,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
 {
   loadUriDiskProfileModule();
 
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
@@ -237,14 +659,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
   Sequence offers;
 
   // We are only interested in storage pools and volume created from
-  // them, which have a "default" profile.
+  // them, which have a "volume-default" profile.
   auto hasSourceType = [](
       const Resource& r,
       const Resource::DiskInfo::Source::Type& type) {
     return r.has_disk() &&
       r.disk().has_source() &&
       r.disk().source().has_profile() &&
-      r.disk().source().profile() == "default" &&
+      r.disk().source().profile() == "volume-default" &&
       r.disk().source().type() == type;
   };
 
@@ -356,6 +778,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
 {
   loadUriDiskProfileModule();
 
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
@@ -417,14 +842,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
   Sequence offers;
 
   // We are only interested in storage pools and volume created from
-  // them, which have a "default" profile.
+  // them, which have a "volume-default" profile.
   auto hasSourceType = [](
       const Resource& r,
       const Resource::DiskInfo::Source::Type& type) {
     return r.has_disk() &&
       r.disk().has_source() &&
       r.disk().source().has_profile() &&
-      r.disk().source().profile() == "default" &&
+      r.disk().source().profile() == "volume-default" &&
       r.disk().source().type() == type;
   };
 
@@ -549,6 +974,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
 {
   loadUriDiskProfileModule();
 
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
@@ -612,14 +1040,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
   Sequence offers;
 
   // We are only interested in storage pools and volume created from
-  // them, which have a "default" profile.
+  // them, which have a "volume-default" profile.
   auto hasSourceType = [](
       const Resource& r,
       const Resource::DiskInfo::Source::Type& type) {
     return r.has_disk() &&
       r.disk().has_source() &&
       r.disk().source().has_profile() &&
-      r.disk().source().profile() == "default" &&
+      r.disk().source().profile() == "volume-default" &&
       r.disk().source().type() == type;
   };
 
@@ -765,6 +1193,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
 {
   loadUriDiskProfileModule();
 
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
@@ -829,14 +1260,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
   Sequence offers;
 
   // We are only interested in storage pools and volume created from
-  // them, which have a "default" profile.
+  // them, which have a "volume-default" profile.
   auto hasSourceType = [](
       const Resource& r,
       const Resource::DiskInfo::Source::Type& type) {
     return r.has_disk() &&
       r.disk().has_source() &&
       r.disk().source().has_profile() &&
-      r.disk().source().profile() == "default" &&
+      r.disk().source().profile() == "volume-default" &&
       r.disk().source().type() == type;
   };
 
@@ -991,8 +1422,10 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
 // This test verifies that a framework can convert pre-existing volumes
 // from a storage local resource provider that uses the test CSI plugin
 // into mount or block volumes.
-TEST_F(StorageLocalResourceProviderTest, ROOT_PreExistingVolume)
+TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
 {
+  setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
+
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
@@ -1157,7 +1590,6 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PreExistingVolume)
   }
 }
 
-
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[6/7] mesos git commit: Tested that operation updates dropped en route to master are resent.

Posted by gr...@apache.org.
Tested that operation updates dropped en route to master are resent.

This patch adds
`StorageLocalResourceProviderTest.ROOT_RetryOperationStatusUpdate`
which verifies that operation status updates are resent by the
agent after being dropped en route to the master.

Review: https://reviews.apache.org/r/65057/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/60f23d87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/60f23d87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/60f23d87

Branch: refs/heads/master
Commit: 60f23d870080c5d70963857cb06a50cf0d2825fb
Parents: 434ef5f
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Fri Jan 19 15:36:31 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:32 2018 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 161 +++++++++++++++++++
 1 file changed, 161 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/60f23d87/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 1b21527..f6d093a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -1916,6 +1916,167 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
   }
 }
 
+
+// This test verifies that operation status updates are resent to the master
+// after being dropped en route to it.
+//
+// To accomplish this:
+//   1. Creates a volume from a RAW disk resource.
+//   2. Drops the first `UpdateOperationStatusMessage` from the agent to the
+//      master, so that it isn't acknowledged by the master.
+//   3. Advances the clock and verifies that the agent resends the operation
+//      status update.
+TEST_F(StorageLocalResourceProviderTest, ROOT_RetryOperationStatusUpdate)
+{
+  Clock::pause();
+
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  flags.agent_features = SlaveCapabilities();
+  flags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(flags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Register a framework to exercise an operation.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+
+  auto isRaw = [](
+      const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isRaw, lambda::_1))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  const Offer& offer = offers->at(0);
+
+  Option<Resource> source;
+  foreach (const Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      source = resource;
+      break;
+    }
+  }
+  ASSERT_SOME(source);
+
+  // We'll drop the first operation status update from the agent to the master.
+  Future<UpdateOperationStatusMessage> droppedUpdateOperationStatusMessage =
+    DROP_PROTOBUF(
+        UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+  // Create a volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      {});
+
+  AWAIT_READY(droppedUpdateOperationStatusMessage);
+
+  // The SLRP should resend the dropped operation status update after the
+  // status update retry interval minimum.
+  Future<UpdateOperationStatusMessage> retriedUpdateOperationStatusMessage =
+    FUTURE_PROTOBUF(
+        UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+  // The master should acknowledge the operation status update.
+  Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage =
+    FUTURE_PROTOBUF(
+      AcknowledgeOperationStatusMessage(), master.get()->pid, slave.get()->pid);
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+  AWAIT_READY(retriedUpdateOperationStatusMessage);
+  AWAIT_READY(acknowledgeOperationStatusMessage);
+
+  // The master acknowledged the operation status update, so the SLRP shouldn't
+  // send further operation status updates.
+  EXPECT_NO_FUTURE_PROTOBUFS(UpdateOperationStatusMessage(), _, _);
+
+  // The master received the `UpdateOperationStatusMessage`, so it can now
+  // offer the `MOUNT` disk - no further offers are needed, so they can be
+  // declined.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[3/7] mesos git commit: Sped up storage local resource provider unit tests.

Posted by gr...@apache.org.
Sped up storage local resource provider unit tests.

This patch makes all storage local resource provider unit tests use an
allocation interval of 50ms and decline offers with filters.

Review: https://reviews.apache.org/r/65059/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e4d6f2f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e4d6f2f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e4d6f2f

Branch: refs/heads/master
Commit: 0e4d6f2ff8c0ebaa1a43e87ceab47108fd2eb31e
Parents: a0283a9
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:26 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:49:42 2018 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 212 +++++++++++--------
 1 file changed, 126 insertions(+), 86 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0e4d6f2f/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index d5ec225..dfe4faf 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -40,7 +40,7 @@ using process::Future;
 using process::Owned;
 
 using testing::Args;
-using testing::AtMost;
+using testing::AtLeast;
 using testing::Sequence;
 
 namespace mesos {
@@ -600,16 +600,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
   setupResourceProviderConfig(Gigabytes(4));
   setupDiskProfileConfig();
 
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  slave::Flags flags = CreateSlaveFlags();
-  flags.isolation = "filesystem/linux";
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
 
   // Disable HTTP authentication to simplify resource provider interactions.
-  flags.authenticate_http_readwrite = false;
+  slaveFlags.authenticate_http_readwrite = false;
 
   // Set the resource provider capability.
   vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -617,17 +620,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
   capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
   capabilities.push_back(capability);
 
-  flags.agent_features = SlaveCapabilities();
-  flags.agent_features->mutable_capabilities()->CopyFrom(
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
       {capabilities.begin(), capabilities.end()});
 
-  flags.resource_provider_config_dir = resourceProviderConfigDir;
-  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   AWAIT_READY(slaveRegisteredMessage);
@@ -640,10 +643,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
   MesosSchedulerDriver driver(
       &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
 
-  // We use the filter explicitly here so that the resources will not
-  // be filtered for 5 seconds (the default).
-  Filters filters;
-  filters.set_refuse_seconds(0);
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -671,7 +679,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
 
   // Decline offers that contain only the agent's default resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillRepeatedly(DeclineOffers());
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
       std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -708,7 +716,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
   driver.acceptOffers(
       {rawDiskOffers->at(0).id()},
       {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeCreatedOffers);
   ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -746,7 +754,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
   driver.acceptOffers(
       {volumeCreatedOffers->at(0).id()},
       {DESTROY_VOLUME(volume.get())},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeDestroyedOffers);
   ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -779,16 +787,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
   setupResourceProviderConfig(Gigabytes(4));
   setupDiskProfileConfig();
 
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  slave::Flags flags = CreateSlaveFlags();
-  flags.isolation = "filesystem/linux";
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
 
   // Disable HTTP authentication to simplify resource provider interactions.
-  flags.authenticate_http_readwrite = false;
+  slaveFlags.authenticate_http_readwrite = false;
 
   // Set the resource provider capability.
   vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -796,17 +807,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
   capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
   capabilities.push_back(capability);
 
-  flags.agent_features = SlaveCapabilities();
-  flags.agent_features->mutable_capabilities()->CopyFrom(
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
       {capabilities.begin(), capabilities.end()});
 
-  flags.resource_provider_config_dir = resourceProviderConfigDir;
-  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   AWAIT_READY(slaveRegisteredMessage);
@@ -819,10 +830,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
   MesosSchedulerDriver driver(
       &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
 
-  // We use the filter explicitly here so that the resources will not
-  // be filtered for 5 seconds (the default).
-  Filters filters;
-  filters.set_refuse_seconds(0);
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -853,7 +869,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
 
   // Decline offers that contain only the agent's default resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillRepeatedly(DeclineOffers());
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
       std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -872,7 +888,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
     .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
 
   EXPECT_CALL(sched, offerRescinded(_, _))
-    .Times(AtMost(1));
+    .Times(AtLeast(1));
 
   driver.start();
 
@@ -894,7 +910,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
   driver.acceptOffers(
       {rawDiskOffers->at(0).id()},
       {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeCreatedOffers);
   ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -931,7 +947,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
   // Restart the agent.
   slave.get()->terminate();
 
-  slave = StartSlave(detector.get(), flags);
+  slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   AWAIT_READY(agentRecoveredOffers);
@@ -941,7 +957,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
   driver.acceptOffers(
       {agentRecoveredOffers->at(0).id()},
       {DESTROY_VOLUME(volume.get())},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeDestroyedOffers);
   ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -975,16 +991,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
   setupResourceProviderConfig(Gigabytes(4));
   setupDiskProfileConfig();
 
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  slave::Flags flags = CreateSlaveFlags();
-  flags.isolation = "filesystem/linux";
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
 
   // Disable HTTP authentication to simplify resource provider interactions.
-  flags.authenticate_http_readwrite = false;
+  slaveFlags.authenticate_http_readwrite = false;
 
   // Set the resource provider capability.
   vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -992,17 +1011,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
   capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
   capabilities.push_back(capability);
 
-  flags.agent_features = SlaveCapabilities();
-  flags.agent_features->mutable_capabilities()->CopyFrom(
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
       {capabilities.begin(), capabilities.end()});
 
-  flags.resource_provider_config_dir = resourceProviderConfigDir;
-  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   AWAIT_READY(slaveRegisteredMessage);
@@ -1015,10 +1034,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
   MesosSchedulerDriver driver(
       &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
 
-  // We use the filter explicitly here so that the resources will not
-  // be filtered for 5 seconds (the default).
-  Filters filters;
-  filters.set_refuse_seconds(0);
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -1051,7 +1075,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
 
   // Decline offers that contain only the agent's default resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillRepeatedly(DeclineOffers());
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
       std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -1083,7 +1107,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
   driver.acceptOffers(
       {rawDiskOffers->at(0).id()},
       {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeCreatedOffers);
   ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -1152,7 +1176,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
            volumeCreatedOffers->at(0).slave_id(),
            persistentVolume,
            createCommandInfo("test -f " + path::join("volume", "file")))})},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(taskStarting);
   EXPECT_EQ(TASK_STARTING, taskStarting->state());
@@ -1174,7 +1198,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
       {taskFinishedOffers->at(0).id()},
       {DESTROY(persistentVolume),
        DESTROY_VOLUME(volume.get())},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeDestroyedOffers);
   ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -1193,16 +1217,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
   setupResourceProviderConfig(Gigabytes(4));
   setupDiskProfileConfig();
 
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  slave::Flags flags = CreateSlaveFlags();
-  flags.isolation = "filesystem/linux";
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
 
   // Disable HTTP authentication to simplify resource provider interactions.
-  flags.authenticate_http_readwrite = false;
+  slaveFlags.authenticate_http_readwrite = false;
 
   // Set the resource provider capability.
   vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -1210,17 +1237,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
   capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
   capabilities.push_back(capability);
 
-  flags.agent_features = SlaveCapabilities();
-  flags.agent_features->mutable_capabilities()->CopyFrom(
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
       {capabilities.begin(), capabilities.end()});
 
-  flags.resource_provider_config_dir = resourceProviderConfigDir;
-  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   AWAIT_READY(slaveRegisteredMessage);
@@ -1233,10 +1260,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
   MesosSchedulerDriver driver(
       &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
 
-  // We use the filter explicitly here so that the resources will not
-  // be filtered for 5 seconds (the default).
-  Filters filters;
-  filters.set_refuse_seconds(0);
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -1270,7 +1302,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
 
   // Decline offers that contain only the agent's default resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillRepeatedly(DeclineOffers());
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
       std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -1278,7 +1310,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
     .WillOnce(FutureArg<1>(&rawDiskOffers));
 
   EXPECT_CALL(sched, offerRescinded(_, _))
-    .Times(AtMost(1));
+    .Times(AtLeast(1));
 
   driver.start();
 
@@ -1305,7 +1337,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
   driver.acceptOffers(
       {rawDiskOffers->at(0).id()},
       {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeCreatedOffers);
   ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -1375,7 +1407,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
            volumeCreatedOffers->at(0).slave_id(),
            persistentVolume,
            createCommandInfo("test -f " + path::join("volume", "file")))})},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(taskStarting);
   EXPECT_EQ(TASK_STARTING, taskStarting->state());
@@ -1391,7 +1423,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
   // Restart the agent.
   slave.get()->terminate();
 
-  slave = StartSlave(detector.get(), flags);
+  slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   AWAIT_READY(agentRecoveredOffers);
@@ -1406,7 +1438,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
       {agentRecoveredOffers->at(0).id()},
       {DESTROY(persistentVolume),
        DESTROY_VOLUME(volume.get())},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(volumeDestroyedOffers);
   ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -1422,16 +1454,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
 {
   setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
 
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  slave::Flags flags = CreateSlaveFlags();
-  flags.isolation = "filesystem/linux";
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
 
   // Disable HTTP authentication to simplify resource provider interactions.
-  flags.authenticate_http_readwrite = false;
+  slaveFlags.authenticate_http_readwrite = false;
 
   // Set the resource provider capability.
   vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -1439,16 +1474,16 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
   capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
   capabilities.push_back(capability);
 
-  flags.agent_features = SlaveCapabilities();
-  flags.agent_features->mutable_capabilities()->CopyFrom(
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
       {capabilities.begin(), capabilities.end()});
 
-  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
 
   Future<SlaveRegisteredMessage> slaveRegisteredMessage =
     FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
 
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   AWAIT_READY(slaveRegisteredMessage);
@@ -1461,10 +1496,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
   MesosSchedulerDriver driver(
       &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
 
-  // We use the filter explicitly here so that the resources will not
-  // be filtered for 5 seconds (the default).
-  Filters filters;
-  filters.set_refuse_seconds(0);
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
 
   EXPECT_CALL(sched, registered(&driver, _, _));
 
@@ -1490,7 +1530,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
 
   // Decline offers that contain only the agent's default resources.
   EXPECT_CALL(sched, resourceOffers(&driver, _))
-    .WillRepeatedly(DeclineOffers());
+    .WillRepeatedly(DeclineOffers(declineFilters));
 
   EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
       isPreExistingVolume)))
@@ -1519,7 +1559,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
       {rawDisksOffers->at(0).id()},
       {CREATE_VOLUME(sources.at(0), Resource::DiskInfo::Source::MOUNT),
        CREATE_BLOCK(sources.at(1))},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(disksConvertedOffers);
   ASSERT_FALSE(disksConvertedOffers->empty());
@@ -1551,7 +1591,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
       {disksConvertedOffers->at(0).id()},
       {DESTROY_VOLUME(volume.get()),
        DESTROY_BLOCK(block.get())},
-      filters);
+      acceptFilters);
 
   AWAIT_READY(disksRevertedOffers);
   ASSERT_FALSE(disksRevertedOffers->empty());


[7/7] mesos git commit: Tested that agent resends unacknowledged operation updates on recovery.

Posted by gr...@apache.org.
Tested that agent resends unacknowledged operation updates on recovery.

This patch adds a test to verify that the agent resends unacknowledged
operation status updates after a recovery.

Review: https://reviews.apache.org/r/65182/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/336e9321
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/336e9321
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/336e9321

Branch: refs/heads/master
Commit: 336e932199643e88c0edbea7c1f08d4b45596389
Parents: 60f23d8
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Fri Jan 19 15:36:32 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:32 2018 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 190 +++++++++++++++++++
 1 file changed, 190 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/336e9321/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index f6d093a..5ac9180 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -2077,6 +2077,196 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_RetryOperationStatusUpdate)
   driver.join();
 }
 
+
+// This test verifies that on agent restarts, unacknowledged operation status
+// updates are resent to the master
+//
+// To accomplish this:
+//   1. Creates a volume from a RAW disk resource.
+//   2. Drops the first `UpdateOperationStatusMessage` from the agent to the
+//      master, so that it isn't acknowledged by the master.
+//   3. Restarts the agent.
+//   4. Verifies that the agent resends the operation status update.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    ROOT_RetryOperationStatusUpdateAfterRecovery)
+{
+  Clock::pause();
+
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags flags = CreateSlaveFlags();
+  flags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  flags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  flags.agent_features = SlaveCapabilities();
+  flags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  flags.resource_provider_config_dir = resourceProviderConfigDir;
+  flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  //
+  // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+  // Google Mock will search the expectations in reverse order.
+  Future<UpdateSlaveMessage> updateSlave2 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave1 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(flags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave1);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // periodically check if the CSI endpoint socket has been created by
+  // the plugin container, which runs in another Linux process.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave2);
+  ASSERT_TRUE(updateSlave2->has_resource_providers());
+  ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+  Clock::pause();
+
+  // Register a framework to exercise an operation.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  Future<vector<Offer>> offers;
+
+  auto isRaw = [](
+      const Resource& r) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+  };
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(isRaw, lambda::_1))))
+    .WillOnce(FutureArg<1>(&offers));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+
+  const Offer& offer = offers->at(0);
+
+  Option<Resource> source;
+  foreach (const Resource& resource, offer.resources()) {
+    if (isRaw(resource)) {
+      source = resource;
+      break;
+    }
+  }
+  ASSERT_SOME(source);
+
+  // We'll drop the first operation status update from the agent to the master.
+  Future<UpdateOperationStatusMessage> droppedUpdateOperationStatusMessage =
+    DROP_PROTOBUF(
+        UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+  // Create a volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      {});
+
+  AWAIT_READY(droppedUpdateOperationStatusMessage);
+
+  // Restart the agent.
+  slave.get()->terminate();
+
+  // Since the local resource provider daemon is started after the agent
+  // is registered, it is guaranteed that the slave will send two
+  // `UpdateSlaveMessage`s, where the latter one contains resources from
+  // the storage local resource provider.
+  Future<UpdateSlaveMessage> updateSlave4 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+  Future<UpdateSlaveMessage> updateSlave3 =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // Once the agent is restarted, the SLRP should resend the dropped operation
+  // status update.
+  Future<UpdateOperationStatusMessage> retriedUpdateOperationStatusMessage =
+    FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, master.get()->pid);
+
+  // The master should acknowledge the operation status update once.
+  Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage =
+    FUTURE_PROTOBUF(AcknowledgeOperationStatusMessage(), master.get()->pid, _);
+
+  // Decline offers without RAW disk resources, the master can send such offers
+  // once it receives the first `UpdateSlaveMessage` after the agent failover,
+  // or after receiving the `UpdateOperationStatusMessage`.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers());
+
+  slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(flags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlave3);
+
+  // Resume the clock so that the CSI plugin's standalone container is created
+  // and the SLRP's async loop notices it.
+  Clock::resume();
+
+  AWAIT_READY(updateSlave4);
+  ASSERT_TRUE(updateSlave4->has_resource_providers());
+  ASSERT_EQ(1, updateSlave4->resource_providers().providers_size());
+
+  Clock::pause();
+
+  AWAIT_READY(retriedUpdateOperationStatusMessage);
+
+  AWAIT_READY(acknowledgeOperationStatusMessage);
+
+  // The master has acknowledged the operation status update, so the SLRP
+  // shouldn't send further operation status updates.
+  EXPECT_NO_FUTURE_PROTOBUFS(UpdateOperationStatusMessage(), _, _);
+
+  Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+  Clock::settle();
+
+  driver.stop();
+  driver.join();
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[4/7] mesos git commit: Added a storage local resource provider test for CSI plugin restart.

Posted by gr...@apache.org.
Added a storage local resource provider test for CSI plugin restart.

The test does the same as the `PublishResources` test, but it kills the
CSI plugin container between each operation.

Review: https://reviews.apache.org/r/64998/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69e5e28e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69e5e28e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69e5e28e

Branch: refs/heads/master
Commit: 69e5e28ed0756f94c839a453052d268696d66a33
Parents: 0e4d6f2
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:27 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:08 2018 -0800

----------------------------------------------------------------------
 src/Makefile.am                                 |   1 +
 src/slave/container_daemon.cpp                  |  47 +--
 src/slave/container_daemon_process.hpp          |  82 ++++++
 .../storage_local_resource_provider_tests.cpp   | 290 +++++++++++++++++++
 4 files changed, 375 insertions(+), 45 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 191594b..fe8f689 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1190,6 +1190,7 @@ libmesos_no_3rdparty_la_SOURCES +=					\
   slave/compatibility.hpp						\
   slave/constants.hpp							\
   slave/container_daemon.hpp						\
+  slave/container_daemon_process.hpp					\
   slave/flags.hpp							\
   slave/gc.hpp								\
   slave/gc_process.hpp							\

http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon.cpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon.cpp b/src/slave/container_daemon.cpp
index d74fa51..6458d1f 100644
--- a/src/slave/container_daemon.cpp
+++ b/src/slave/container_daemon.cpp
@@ -16,20 +16,17 @@
 
 #include "slave/container_daemon.hpp"
 
-#include <mesos/agent/agent.hpp>
-
 #include <process/defer.hpp>
 #include <process/id.hpp>
-#include <process/process.hpp>
 
 #include <stout/lambda.hpp>
 #include <stout/stringify.hpp>
 #include <stout/unreachable.hpp>
 
-#include "common/http.hpp"
-
 #include "internal/evolve.hpp"
 
+#include "slave/container_daemon_process.hpp"
+
 namespace http = process::http;
 
 using std::string;
@@ -64,46 +61,6 @@ static inline http::Headers getAuthHeader(const Option<string>& authToken)
 }
 
 
-class ContainerDaemonProcess : public Process<ContainerDaemonProcess>
-{
-public:
-  explicit ContainerDaemonProcess(
-      const http::URL& _agentUrl,
-      const Option<string>& _authToken,
-      const ContainerID& containerId,
-      const Option<CommandInfo>& commandInfo,
-      const Option<Resources>& resources,
-      const Option<ContainerInfo>& containerInfo,
-      const Option<std::function<Future<Nothing>()>>& _postStartHook,
-      const Option<std::function<Future<Nothing>()>>& _postStopHook);
-
-  ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete;
-
-  ContainerDaemonProcess& operator=(
-      const ContainerDaemonProcess& other) = delete;
-
-  Future<Nothing> wait();
-
-protected:
-  void initialize() override;
-
-private:
-  void launchContainer();
-  void waitContainer();
-
-  const http::URL agentUrl;
-  const Option<string> authToken;
-  const ContentType contentType;
-  const Option<std::function<Future<Nothing>()>> postStartHook;
-  const Option<std::function<Future<Nothing>()>> postStopHook;
-
-  Call launchCall;
-  Call waitCall;
-
-  Promise<Nothing> terminated;
-};
-
-
 ContainerDaemonProcess::ContainerDaemonProcess(
     const http::URL& _agentUrl,
     const Option<string>& _authToken,

http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon_process.hpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon_process.hpp b/src/slave/container_daemon_process.hpp
new file mode 100644
index 0000000..a5d19a0
--- /dev/null
+++ b/src/slave/container_daemon_process.hpp
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__
+#define __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__
+
+#include <functional>
+#include <string>
+
+#include <mesos/mesos.hpp>
+#include <mesos/agent/agent.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+#include <stout/option.hpp>
+
+#include "common/http.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class ContainerDaemonProcess : public process::Process<ContainerDaemonProcess>
+{
+public:
+  explicit ContainerDaemonProcess(
+      const process::http::URL& _agentUrl,
+      const Option<std::string>& _authToken,
+      const ContainerID& containerId,
+      const Option<CommandInfo>& commandInfo,
+      const Option<Resources>& resources,
+      const Option<ContainerInfo>& containerInfo,
+      const Option<std::function<process::Future<Nothing>()>>& _postStartHook,
+      const Option<std::function<process::Future<Nothing>()>>& _postStopHook);
+
+  ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete;
+
+  ContainerDaemonProcess& operator=(
+      const ContainerDaemonProcess& other) = delete;
+
+  process::Future<Nothing> wait();
+
+  // Made public for testing purpose.
+  void launchContainer();
+  void waitContainer();
+
+protected:
+  void initialize() override;
+
+private:
+  const process::http::URL agentUrl;
+  const Option<std::string> authToken;
+  const ContentType contentType;
+  const Option<std::function<process::Future<Nothing>()>> postStartHook;
+  const Option<std::function<process::Future<Nothing>()>> postStopHook;
+
+  agent::Call launchCall;
+  agent::Call waitCall;
+
+  process::Promise<Nothing> terminated;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__

http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index dfe4faf..1b21527 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -24,6 +24,12 @@
 
 #include "module/manager.hpp"
 
+#include "slave/container_daemon_process.hpp"
+
+#include "slave/containerizer/fetcher.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+
 #include "tests/flags.hpp"
 #include "tests/mesos.hpp"
 
@@ -31,6 +37,8 @@ using std::shared_ptr;
 using std::string;
 using std::vector;
 
+using mesos::internal::slave::ContainerDaemonProcess;
+
 using mesos::master::detector::MasterDetector;
 
 using mesos::v1::resource_provider::Call;
@@ -1449,6 +1457,288 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
 
 
 // This test verifies that the storage local resource provider can
+// restart its CSI plugin after it is killed and continue to work
+// properly.
+TEST_F(
+    StorageLocalResourceProviderTest,
+    ROOT_PublishUnpublishResourcesPluginKilled)
+{
+  loadUriDiskProfileModule();
+
+  setupResourceProviderConfig(Gigabytes(4));
+  setupDiskProfileConfig();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.isolation = "filesystem/linux";
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+  slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+  slave::Fetcher fetcher(slaveFlags);
+
+  Try<slave::MesosContainerizer*> _containerizer =
+    slave::MesosContainerizer::create(slaveFlags, false, &fetcher);
+  ASSERT_SOME(_containerizer);
+
+  Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave =
+    StartSlave(detector.get(), containerizer.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // Register a framework to exercise operations.
+  FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+  framework.set_roles(0, "storage");
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+  // We use the following filter so that the resources will not be
+  // filtered for 5 seconds (the default).
+  Filters acceptFilters;
+  acceptFilters.set_refuse_seconds(0);
+
+  // We use the following filter to filter offers that do not have
+  // wanted resources for 365 days (the maximum).
+  Filters declineFilters;
+  declineFilters.set_refuse_seconds(Days(365).secs());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // The framework is expected to see the following offers in sequence:
+  //   1. One containing a RAW disk resource before `CREATE_VOLUME`.
+  //   2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+  //   3. One containing the same MOUNT disk resource after `CREATE`,
+  //      `LAUNCH` and `DESTROY`.
+  //   4. One containing the same RAW disk resource after `DESTROY_VOLUME`.
+  //
+  // We set up the expectations for these offers as the test progresses.
+  Future<vector<Offer>> rawDiskOffers;
+  Future<vector<Offer>> volumeCreatedOffers;
+  Future<vector<Offer>> taskFinishedOffers;
+  Future<vector<Offer>> volumeDestroyedOffers;
+
+  Sequence offers;
+
+  // We are only interested in storage pools and volume created from
+  // them, which have a "volume-default" profile.
+  auto hasSourceType = [](
+      const Resource& r,
+      const Resource::DiskInfo::Source::Type& type) {
+    return r.has_disk() &&
+      r.disk().has_source() &&
+      r.disk().source().has_profile() &&
+      r.disk().source().profile() == "volume-default" &&
+      r.disk().source().type() == type;
+  };
+
+  // Decline offers that contain only the agent's default resources.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillRepeatedly(DeclineOffers(declineFilters));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+  driver.start();
+
+  AWAIT_READY(rawDiskOffers);
+  ASSERT_FALSE(rawDiskOffers->empty());
+
+  Option<Resource> source;
+
+  foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+      source = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(source);
+
+  // Get the ID of the CSI plugin container.
+  Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
+
+  AWAIT_READY(pluginContainers);
+  ASSERT_EQ(1u, pluginContainers->size());
+
+  const ContainerID& pluginContainerId = *pluginContainers->begin();
+
+  Future<Nothing> pluginRestarted =
+    FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+  // Kill the plugin container and wait for it to restart.
+  Future<int> pluginKilled = containerizer->status(pluginContainerId)
+    .then([](const ContainerStatus& status) {
+      return os::kill(status.executor_pid(), SIGKILL);
+    });
+
+  AWAIT_ASSERT_EQ(0, pluginKilled);
+  AWAIT_READY(pluginRestarted);
+
+  // Create a volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+      std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+  driver.acceptOffers(
+      {rawDiskOffers->at(0).id()},
+      {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+      acceptFilters);
+
+  AWAIT_READY(volumeCreatedOffers);
+  ASSERT_FALSE(volumeCreatedOffers->empty());
+
+  Option<Resource> volume;
+
+  foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+    if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+      volume = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(volume);
+  ASSERT_TRUE(volume->disk().source().has_id());
+  ASSERT_TRUE(volume->disk().source().has_metadata());
+  ASSERT_TRUE(volume->disk().source().has_mount());
+  ASSERT_TRUE(volume->disk().source().mount().has_root());
+  EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+  // Check if the volume is actually created by the test CSI plugin.
+  Option<string> volumePath;
+
+  foreach (const Label& label, volume->disk().source().metadata().labels()) {
+    if (label.key() == "path") {
+      volumePath = label.value();
+      break;
+    }
+  }
+
+  ASSERT_SOME(volumePath);
+  EXPECT_TRUE(os::exists(volumePath.get()));
+
+  pluginRestarted =
+    FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+  // Kill the plugin container and wait for it to restart.
+  pluginKilled = containerizer->status(pluginContainerId)
+    .then([](const ContainerStatus& status) {
+      return os::kill(status.executor_pid(), SIGKILL);
+    });
+
+  AWAIT_ASSERT_EQ(0, pluginKilled);
+  AWAIT_READY(pluginRestarted);
+
+  // Put a file into the volume.
+  ASSERT_SOME(os::touch(path::join(volumePath.get(), "file")));
+
+  // Create a persistent volume on the CSI volume, then launch a task to
+  // use the persistent volume.
+  Resource persistentVolume = volume.get();
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_id(id::UUID::random().toString());
+  persistentVolume.mutable_disk()->mutable_persistence()
+    ->set_principal(framework.principal());
+  persistentVolume.mutable_disk()->mutable_volume()
+    ->set_container_path("volume");
+  persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+  Future<TaskStatus> taskStarting;
+  Future<TaskStatus> taskRunning;
+  Future<TaskStatus> taskFinished;
+
+  EXPECT_CALL(sched, statusUpdate(&driver, _))
+    .WillOnce(FutureArg<1>(&taskStarting))
+    .WillOnce(FutureArg<1>(&taskRunning))
+    .WillOnce(FutureArg<1>(&taskFinished));
+
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+      persistentVolume)))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&taskFinishedOffers));
+
+  driver.acceptOffers(
+      {volumeCreatedOffers->at(0).id()},
+      {CREATE(persistentVolume),
+       LAUNCH({createTask(
+           volumeCreatedOffers->at(0).slave_id(),
+           persistentVolume,
+           createCommandInfo("test -f " + path::join("volume", "file")))})},
+      acceptFilters);
+
+  AWAIT_READY(taskStarting);
+  EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+  AWAIT_READY(taskRunning);
+  EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+  AWAIT_READY(taskFinished);
+  EXPECT_EQ(TASK_FINISHED, taskFinished->state());
+
+  AWAIT_READY(taskFinishedOffers);
+
+  pluginRestarted =
+    FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+  // Kill the plugin container and wait for it to restart.
+  pluginKilled = containerizer->status(pluginContainerId)
+    .then([](const ContainerStatus& status) {
+      return os::kill(status.executor_pid(), SIGKILL);
+    });
+
+  AWAIT_ASSERT_EQ(0, pluginKilled);
+  AWAIT_READY(pluginRestarted);
+
+  // Destroy the persistent volume and the CSI volume.
+  EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get())))
+    .InSequence(offers)
+    .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+  driver.acceptOffers(
+      {taskFinishedOffers->at(0).id()},
+      {DESTROY(persistentVolume),
+       DESTROY_VOLUME(volume.get())},
+      acceptFilters);
+
+  AWAIT_READY(volumeDestroyedOffers);
+  ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+  // Check if the volume is actually deleted by the test CSI plugin.
+  EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+
+// This test verifies that the storage local resource provider can
 // convert pre-existing CSI volumes into mount or block volumes.
 TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
 {


[2/7] mesos git commit: Renamed storage local resource provider tests to describe them better.

Posted by gr...@apache.org.
Renamed storage local resource provider tests to describe them better.

Review: https://reviews.apache.org/r/64994/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0283a9f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0283a9f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0283a9f

Branch: refs/heads/master
Commit: a0283a9f29354088e51f837ad61d776434070f3a
Parents: 4924443
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:24 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:49:03 2018 -0800

----------------------------------------------------------------------
 .../storage_local_resource_provider_tests.cpp   | 38 +++++++++-----------
 1 file changed, 17 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a0283a9f/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index d98b914..d5ec225 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -487,8 +487,8 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_SmallDisk)
 
 
 // This test verifies that a framework can receive offers having new
-// storage pools from the storage local resource provider due to
-// adding new profiles.
+// storage pools from the storage local resource provider after a new
+// profile appears.
 TEST_F(StorageLocalResourceProviderTest, ROOT_NewProfile)
 {
   Clock::pause();
@@ -591,10 +591,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewProfile)
 }
 
 
-// This test verifies that a framework can create then destroy a new
-// volume from the storage pool of a storage local resource provider
-// that uses the test CSI plugin.
-TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
+// This test verifies that the storage local resource provider can
+// create then destroy a new volume from a storage pool.
+TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
 {
   loadUriDiskProfileModule();
 
@@ -771,10 +770,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
 }
 
 
-// This test verifies that a framework can destroy a new volume created
-// from the storage pool of a storage local resource provider that uses
-// the test CSI plugin after recovery.
-TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
+// This test verifies that the storage local resource provider can
+// destroy a volume created from a storage pool after recovery.
+TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
 {
   loadUriDiskProfileModule();
 
@@ -967,10 +965,10 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
 }
 
 
-// This test verifies that a framework can launch a task using a created
-// volume from a storage local resource provider that uses the test CSI
-// plugin, then destroy the volume while it is published.
-TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
+// This test verifies that the storage local resource provider can
+// publish a volume required by a task, then destroy the published
+// volume after the task finishes.
+TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
 {
   loadUriDiskProfileModule();
 
@@ -1186,10 +1184,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
 }
 
 
-// This test verifies that a framework can destroy a volume that was
-// created from a storage pool of a storage local resource provider
-// and used by a task after recovery.
-TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
+// This test verifies that the storage local resource provider can
+// destroy a published volume after recovery.
+TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
 {
   loadUriDiskProfileModule();
 
@@ -1419,9 +1416,8 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
 }
 
 
-// This test verifies that a framework can convert pre-existing volumes
-// from a storage local resource provider that uses the test CSI plugin
-// into mount or block volumes.
+// This test verifies that the storage local resource provider can
+// convert pre-existing CSI volumes into mount or block volumes.
 TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
 {
   setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");


[5/7] mesos git commit: Resumed the clock if necessary when destroying test agent.

Posted by gr...@apache.org.
Resumed the clock if necessary when destroying test agent.

Because the `cgroups::destroy()` code path makes use of `delay()`,
the clock must not be paused in order for the destructor of the
test `Slave` to reliably destroy all remaining containers.

This patch updates the destructor to check if the clock is paused
and, if so, resume it before destroying containers.

Review: https://reviews.apache.org/r/65232/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/434ef5f4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/434ef5f4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/434ef5f4

Branch: refs/heads/master
Commit: 434ef5f431d62113d649e7a7c946c55d43a8034a
Parents: 69e5e28
Author: Greg Mann <gr...@mesosphere.io>
Authored: Fri Jan 19 15:36:29 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:32 2018 -0800

----------------------------------------------------------------------
 src/tests/cluster.cpp | 14 ++++++++++++++
 1 file changed, 14 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/434ef5f4/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 568c9c7..19a41c7 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -641,6 +641,16 @@ Slave::~Slave()
 
     AWAIT_READY(containers);
 
+    // Because the `cgroups::destroy()` code path makes use of `delay()`, the
+    // clock must not be paused in order to reliably destroy all remaining
+    // containers. If necessary, we resume the clock here and then pause it
+    // again when we're done destroying containers.
+    bool paused = process::Clock::paused();
+
+    if (paused) {
+      process::Clock::resume();
+    }
+
     foreach (const ContainerID& containerId, containers.get()) {
       process::Future<Option<ContainerTermination>> wait =
         containerizer->wait(containerId);
@@ -656,6 +666,10 @@ Slave::~Slave()
       }
     }
 
+    if (paused) {
+      process::Clock::pause();
+    }
+
     containers = containerizer->containers();
     AWAIT_READY(containers);