You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/01/20 00:44:11 UTC
[1/7] mesos git commit: Added SLRP unit tests for profile updates and
corner cases.
Repository: mesos
Updated Branches:
refs/heads/master ddde32525 -> 336e93219
Added SLRP unit tests for profile updates and corner cases.
This patch adds the following storage local resource provider tests:
`NoResource`: RP updates its state with no resources, and can recover
from a checkpointed state that contains no resources.
`ZeroSizedDisk`: CSI plugin reports a pre-existing volume with
zero capacity.
`SmallDisk`: CSI plugin reports a storage pool and a pre-existing volume
with sizes < 1MB.
`NewProfile`: A new profile is added after RP updates its state.
Review: https://reviews.apache.org/r/64992/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/4924443f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/4924443f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/4924443f
Branch: refs/heads/master
Commit: 4924443fc6d1b44c33e3403564746bd964e18703
Parents: ddde325
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:22 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:48:43 2018 -0800
----------------------------------------------------------------------
src/examples/test_csi_plugin.cpp | 9 +-
.../storage_local_resource_provider_tests.cpp | 530 +++++++++++++++++--
2 files changed, 487 insertions(+), 52 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/4924443f/src/examples/test_csi_plugin.cpp
----------------------------------------------------------------------
diff --git a/src/examples/test_csi_plugin.cpp b/src/examples/test_csi_plugin.cpp
index f6b2c98..0f65c40 100644
--- a/src/examples/test_csi_plugin.cpp
+++ b/src/examples/test_csi_plugin.cpp
@@ -588,7 +588,12 @@ Status TestCSIPlugin::GetCapacity(
foreach (const csi::VolumeCapability& capability,
request->volume_capabilities()) {
- if (!capability.has_mount()) {
+ // We report zero capacity for any capability other than the
+ // default-constructed `MountVolume` capability since this plugin
+ // does not support any filesystem types and mount flags.
+ if (!capability.has_mount() ||
+ !capability.mount().fs_type().empty() ||
+ !capability.mount().mount_flags().empty()) {
response->set_available_capacity(0);
return Status::OK;
@@ -925,8 +930,6 @@ int main(int argc, char** argv)
Try<Bytes> capacity = Bytes::parse(pair[1]);
if (capacity.isError()) {
error = capacity.error();
- } else if (capacity.get() == 0) {
- error = "Volume capacity cannot be zero";
} else {
volumes.put(pair[0], capacity.get());
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/4924443f/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index bbfe95e..d98b914 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -14,6 +14,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
+#include <process/clock.hpp>
#include <process/future.hpp>
#include <process/gtest.hpp>
#include <process/gmock.hpp>
@@ -32,6 +33,9 @@ using std::vector;
using mesos::master::detector::MasterDetector;
+using mesos::v1::resource_provider::Call;
+
+using process::Clock;
using process::Future;
using process::Owned;
@@ -54,15 +58,57 @@ public:
{
MesosTest::SetUp();
- const string testCsiPluginWorkDir = path::join(sandbox.get(), "test");
+ testCsiPluginWorkDir = path::join(sandbox.get(), "test");
ASSERT_SOME(os::mkdir(testCsiPluginWorkDir));
resourceProviderConfigDir =
path::join(sandbox.get(), "resource_provider_configs");
-
ASSERT_SOME(os::mkdir(resourceProviderConfigDir));
- string testCsiPluginPath =
+ uriDiskProfileConfigPath =
+ path::join(sandbox.get(), "disk_profiles.json");
+ }
+
+ virtual void TearDown()
+ {
+ // Unload modules.
+ foreach (const Modules::Library& library, modules.libraries()) {
+ foreach (const Modules::Library::Module& module, library.modules()) {
+ if (module.has_name()) {
+ ASSERT_SOME(modules::ModuleManager::unload(module.name()));
+ }
+ }
+ }
+
+ MesosTest::TearDown();
+ }
+
+ void loadUriDiskProfileModule()
+ {
+ const string libraryPath = getModulePath("uri_disk_profile");
+
+ Modules::Library* library = modules.add_libraries();
+ library->set_name("uri_disk_profile");
+ library->set_file(libraryPath);
+
+ Modules::Library::Module* module = library->add_modules();
+ module->set_name(URI_DISK_PROFILE_ADAPTOR_NAME);
+
+ Parameter* uri = module->add_parameters();
+ uri->set_key("uri");
+ uri->set_value(uriDiskProfileConfigPath);
+ Parameter* pollInterval = module->add_parameters();
+ pollInterval->set_key("poll_interval");
+ pollInterval->set_value("1secs");
+
+ ASSERT_SOME(modules::ModuleManager::load(modules));
+ }
+
+ void setupResourceProviderConfig(
+ const Bytes& capacity,
+ const Option<string> volumes = None())
+ {
+ const string testCsiPluginPath =
path::join(tests::flags.build_dir, "src", "test-csi-plugin");
Try<string> resourceProviderConfig = strings::format(
@@ -91,8 +137,8 @@ public:
"value": "%s",
"arguments": [
"%s",
- "--available_capacity=2GB",
- "--volumes=volume1:1GB;volume2:1GB",
+ "--available_capacity=%s",
+ "--volumes=%s",
"--work_dir=%s"
]
}
@@ -104,6 +150,8 @@ public:
)~",
testCsiPluginPath,
testCsiPluginPath,
+ stringify(capacity),
+ volumes.getOrElse(""),
testCsiPluginWorkDir);
ASSERT_SOME(resourceProviderConfig);
@@ -111,19 +159,29 @@ public:
ASSERT_SOME(os::write(
path::join(resourceProviderConfigDir, "test.json"),
resourceProviderConfig.get()));
+ }
- uriDiskProfileConfigPath =
- path::join(sandbox.get(), "disk_profiles.json");
-
+ void setupDiskProfileConfig()
+ {
Try<Nothing> write = os::write(
uriDiskProfileConfigPath,
R"~(
{
"profile_matrix": {
- "default" : {
- "volume_capabilities" : {
- "mount" : {},
- "access_mode" : { "mode" : "SINGLE_NODE_WRITER" }
+ "volume-default": {
+ "volume_capabilities": {
+ "mount": {},
+ "access_mode": {
+ "mode": "SINGLE_NODE_WRITER"
+ }
+ }
+ },
+ "block-default": {
+ "volume_capabilities": {
+ "block": {},
+ "access_mode": {
+ "mode": "SINGLE_NODE_WRITER"
+ }
}
}
}
@@ -133,43 +191,404 @@ public:
ASSERT_SOME(write);
}
- virtual void TearDown()
- {
- // Unload modules.
- foreach (const Modules::Library& library, modules.libraries()) {
- foreach (const Modules::Library::Module& module, library.modules()) {
- if (module.has_name()) {
- ASSERT_SOME(modules::ModuleManager::unload(module.name()));
- }
- }
+protected:
+ Modules modules;
+ string resourceProviderConfigDir;
+ string testCsiPluginWorkDir;
+ string uriDiskProfileConfigPath;
+};
+
+
+// This test verifies that a storage local resource provider can report
+// no resource and recover from this state.
+TEST_F(StorageLocalResourceProviderTest, ROOT_NoResource)
+{
+ Clock::pause();
+
+ setupResourceProviderConfig(Bytes(0));
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration and prevent retry.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave1);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // periodically check if the CSI endpoint socket has been created by
+ // the plugin container, which runs in another Linux process.
+ Clock::resume();
+
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+ EXPECT_EQ(
+ 0,
+ updateSlave2->resource_providers().providers(0).total_resources_size());
+
+ Clock::pause();
+
+ // Restart the agent.
+ slave.get()->terminate();
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave4 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave3 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration and prevent retry.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave3);
+
+ Clock::resume();
+
+ AWAIT_READY(updateSlave4);
+ ASSERT_TRUE(updateSlave4->has_resource_providers());
+ ASSERT_EQ(1, updateSlave4->resource_providers().providers_size());
+ EXPECT_EQ(
+ 0,
+ updateSlave4->resource_providers().providers(0).total_resources_size());
+}
+
+
+// This test verifies that any zero-sized volume reported by a CSI
+// plugin will be ignored by the storage local resource provider.
+TEST_F(StorageLocalResourceProviderTest, ROOT_ZeroSizedDisk)
+{
+ Clock::pause();
+
+ setupResourceProviderConfig(Bytes(0), "volume0:0B");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration and prevent retry.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave1);
+
+ Clock::resume();
+
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+ Option<Resource> volume;
+ foreach (const Resource& resource,
+ updateSlave2->resource_providers().providers(0).total_resources()) {
+ if (Resources::hasResourceProvider(resource)) {
+ volume = resource;
}
+ }
- MesosTest::TearDown();
+ ASSERT_NONE(volume);
+}
+
+
+// This test verifies that the storage local resource provider can
+// handle disks less than 1MB correctly.
+TEST_F(StorageLocalResourceProviderTest, ROOT_SmallDisk)
+{
+ loadUriDiskProfileModule();
+
+ setupResourceProviderConfig(Kilobytes(512), "volume0:512KB");
+ setupDiskProfileConfig();
+
+ master::Flags masterFlags = CreateMasterFlags();
+
+ // Use a small allocation interval to speed up the test. We do this
+ // instead of manipulating the clock to keep the test concise and
+ // avoid waiting for `UpdateSlaveMessage`s and pausing/resuming the
+ // clock multiple times.
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Register a framework to receive offers.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> rawDisksOffers;
+
+ // We are interested in offers that contains both the storage pool and
+ // the pre-existing volume.
+ auto isStoragePool = [](const Resource& r) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().type() == Resource::DiskInfo::Source::RAW &&
+ !r.disk().source().has_id() &&
+ r.disk().source().has_profile();
+ };
+
+ auto isPreExistingVolume = [](const Resource& r) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().has_id() &&
+ !r.disk().source().has_profile();
+ };
+
+ // Since the master may send out offers before the resource provider
+ // reports the storage pool, we decline offers that do not have any
+ // storage pool. This would also decline offers that contain only the
+ // agent's default resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers(declineFilters));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ isStoragePool)))
+ .WillOnce(FutureArg<1>(&rawDisksOffers));
+
+ driver.start();
+
+ AWAIT_READY(rawDisksOffers);
+ ASSERT_FALSE(rawDisksOffers->empty());
+
+ Option<Resource> storagePool;
+ Option<Resource> preExistingVolume;
+ foreach (const Resource& resource, rawDisksOffers->at(0).resources()) {
+ if (isStoragePool(resource)) {
+ storagePool = resource;
+ } else if (isPreExistingVolume(resource)) {
+ preExistingVolume = resource;
+ }
}
- void loadUriDiskProfileModule()
- {
- string libraryPath = getModulePath("uri_disk_profile");
+ ASSERT_SOME(storagePool);
+ EXPECT_EQ(
+ Kilobytes(512),
+ Bytes(storagePool->scalar().value() * Bytes::MEGABYTES));
- Modules::Library* library = modules.add_libraries();
- library->set_name("uri_disk_profile");
- library->set_file(libraryPath);
+ ASSERT_SOME(preExistingVolume);
+ EXPECT_EQ(
+ Kilobytes(512),
+ Bytes(preExistingVolume->scalar().value() * Bytes::MEGABYTES));
+}
- Modules::Library::Module* module = library->add_modules();
- module->set_name(URI_DISK_PROFILE_ADAPTOR_NAME);
- Parameter* parameter = module->add_parameters();
- parameter->set_key("uri");
- parameter->set_value(uriDiskProfileConfigPath);
+// This test verifies that a framework can receive offers having new
+// storage pools from the storage local resource provider due to
+// adding new profiles.
+TEST_F(StorageLocalResourceProviderTest, ROOT_NewProfile)
+{
+ Clock::pause();
- ASSERT_SOME(modules::ModuleManager::load(modules));
+ loadUriDiskProfileModule();
+
+ setupResourceProviderConfig(Gigabytes(4));
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration and prevent retry.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave1);
+
+ Clock::resume();
+
+ // No resource should be reported by the resource provider before
+ // adding any profile.
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+ EXPECT_EQ(
+ 0,
+ updateSlave2->resource_providers().providers(0).total_resources_size());
+
+ Future<UpdateSlaveMessage> updateSlave3 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ // Add new profiles.
+ setupDiskProfileConfig();
+
+ // A new storage pool for profile "volume-default" should be reported
+ // by the resource provider. Still expect no storage pool for
+ // "block-default" since it is not supported by the test CSI plugin.
+ AWAIT_READY(updateSlave3);
+ ASSERT_TRUE(updateSlave3->has_resource_providers());
+ ASSERT_EQ(1, updateSlave3->resource_providers().providers_size());
+ EXPECT_EQ(
+ 1,
+ updateSlave3->resource_providers().providers(0).total_resources_size());
+
+ Option<Resource> volumeStoragePool;
+ Option<Resource> blockStoragePool;
+ foreach (const Resource& resource,
+ updateSlave3->resource_providers().providers(0).total_resources()) {
+ if (!resource.has_disk() ||
+ !resource.disk().has_source() ||
+ resource.disk().source().type() != Resource::DiskInfo::Source::RAW ||
+ !resource.disk().source().has_profile() ||
+ resource.disk().source().has_id()) {
+ continue;
+ }
+
+ if (resource.disk().source().profile() == "volume-default") {
+ volumeStoragePool = resource;
+ } else if (resource.disk().source().profile() == "block-default") {
+ blockStoragePool = resource;
+ }
}
-protected:
- Modules modules;
- string resourceProviderConfigDir;
- string uriDiskProfileConfigPath;
-};
+ EXPECT_SOME(volumeStoragePool);
+ EXPECT_NONE(blockStoragePool);
+}
// This test verifies that a framework can create then destroy a new
@@ -179,6 +598,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
{
loadUriDiskProfileModule();
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileConfig();
+
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -237,14 +659,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
Sequence offers;
// We are only interested in storage pools and volume created from
- // them, which have a "default" profile.
+ // them, which have a "volume-default" profile.
auto hasSourceType = [](
const Resource& r,
const Resource::DiskInfo::Source::Type& type) {
return r.has_disk() &&
r.disk().has_source() &&
r.disk().source().has_profile() &&
- r.disk().source().profile() == "default" &&
+ r.disk().source().profile() == "volume-default" &&
r.disk().source().type() == type;
};
@@ -356,6 +778,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
{
loadUriDiskProfileModule();
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileConfig();
+
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -417,14 +842,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
Sequence offers;
// We are only interested in storage pools and volume created from
- // them, which have a "default" profile.
+ // them, which have a "volume-default" profile.
auto hasSourceType = [](
const Resource& r,
const Resource::DiskInfo::Source::Type& type) {
return r.has_disk() &&
r.disk().has_source() &&
r.disk().source().has_profile() &&
- r.disk().source().profile() == "default" &&
+ r.disk().source().profile() == "volume-default" &&
r.disk().source().type() == type;
};
@@ -549,6 +974,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
{
loadUriDiskProfileModule();
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileConfig();
+
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -612,14 +1040,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
Sequence offers;
// We are only interested in storage pools and volume created from
- // them, which have a "default" profile.
+ // them, which have a "volume-default" profile.
auto hasSourceType = [](
const Resource& r,
const Resource::DiskInfo::Source::Type& type) {
return r.has_disk() &&
r.disk().has_source() &&
r.disk().source().has_profile() &&
- r.disk().source().profile() == "default" &&
+ r.disk().source().profile() == "volume-default" &&
r.disk().source().type() == type;
};
@@ -765,6 +1193,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
{
loadUriDiskProfileModule();
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileConfig();
+
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -829,14 +1260,14 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
Sequence offers;
// We are only interested in storage pools and volume created from
- // them, which have a "default" profile.
+ // them, which have a "volume-default" profile.
auto hasSourceType = [](
const Resource& r,
const Resource::DiskInfo::Source::Type& type) {
return r.has_disk() &&
r.disk().has_source() &&
r.disk().source().has_profile() &&
- r.disk().source().profile() == "default" &&
+ r.disk().source().profile() == "volume-default" &&
r.disk().source().type() == type;
};
@@ -991,8 +1422,10 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
// This test verifies that a framework can convert pre-existing volumes
// from a storage local resource provider that uses the test CSI plugin
// into mount or block volumes.
-TEST_F(StorageLocalResourceProviderTest, ROOT_PreExistingVolume)
+TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
{
+ setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
+
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
@@ -1157,7 +1590,6 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PreExistingVolume)
}
}
-
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[6/7] mesos git commit: Tested that operation updates dropped en
route to master are resent.
Posted by gr...@apache.org.
Tested that operation updates dropped en route to master are resent.
This patch adds
`StorageLocalResourceProviderTest.ROOT_RetryOperationStatusUpdate`
which verifies that operation status updates are resent by the
agent after being dropped en route to the master.
Review: https://reviews.apache.org/r/65057/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/60f23d87
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/60f23d87
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/60f23d87
Branch: refs/heads/master
Commit: 60f23d870080c5d70963857cb06a50cf0d2825fb
Parents: 434ef5f
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Fri Jan 19 15:36:31 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:32 2018 -0800
----------------------------------------------------------------------
.../storage_local_resource_provider_tests.cpp | 161 +++++++++++++++++++
1 file changed, 161 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/60f23d87/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index 1b21527..f6d093a 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -1916,6 +1916,167 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
}
}
+
+// This test verifies that operation status updates are resent to the master
+// after being dropped en route to it.
+//
+// To accomplish this:
+// 1. Creates a volume from a RAW disk resource.
+// 2. Drops the first `UpdateOperationStatusMessage` from the agent to the
+// master, so that it isn't acknowledged by the master.
+// 3. Advances the clock and verifies that the agent resends the operation
+// status update.
+TEST_F(StorageLocalResourceProviderTest, ROOT_RetryOperationStatusUpdate)
+{
+ Clock::pause();
+
+ loadUriDiskProfileModule();
+
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileConfig();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ flags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ flags.agent_features = SlaveCapabilities();
+ flags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ flags.resource_provider_config_dir = resourceProviderConfigDir;
+ flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ //
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(flags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave1);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // periodically check if the CSI endpoint socket has been created by
+ // the plugin container, which runs in another Linux process.
+ Clock::resume();
+
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+ Clock::pause();
+
+ // Register a framework to exercise an operation.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+
+ auto isRaw = [](
+ const Resource& r) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().has_profile() &&
+ r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+ };
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isRaw, lambda::_1))))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->empty());
+
+ const Offer& offer = offers->at(0);
+
+ Option<Resource> source;
+ foreach (const Resource& resource, offer.resources()) {
+ if (isRaw(resource)) {
+ source = resource;
+ break;
+ }
+ }
+ ASSERT_SOME(source);
+
+ // We'll drop the first operation status update from the agent to the master.
+ Future<UpdateOperationStatusMessage> droppedUpdateOperationStatusMessage =
+ DROP_PROTOBUF(
+ UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+ // Create a volume.
+ driver.acceptOffers(
+ {offer.id()},
+ {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+ {});
+
+ AWAIT_READY(droppedUpdateOperationStatusMessage);
+
+ // The SLRP should resend the dropped operation status update after the
+ // status update retry interval minimum.
+ Future<UpdateOperationStatusMessage> retriedUpdateOperationStatusMessage =
+ FUTURE_PROTOBUF(
+ UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+ // The master should acknowledge the operation status update.
+ Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage =
+ FUTURE_PROTOBUF(
+ AcknowledgeOperationStatusMessage(), master.get()->pid, slave.get()->pid);
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+
+ AWAIT_READY(retriedUpdateOperationStatusMessage);
+ AWAIT_READY(acknowledgeOperationStatusMessage);
+
+ // The master acknowledged the operation status update, so the SLRP shouldn't
+ // send further operation status updates.
+ EXPECT_NO_FUTURE_PROTOBUFS(UpdateOperationStatusMessage(), _, _);
+
+ // The master received the `UpdateOperationStatusMessage`, so it can now
+ // offer the `MOUNT` disk - no further offers are needed, so they can be
+ // declined.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers());
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ driver.stop();
+ driver.join();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[3/7] mesos git commit: Sped up storage local resource provider unit
tests.
Posted by gr...@apache.org.
Sped up storage local resource provider unit tests.
This patch makes all storage local resource provider unit tests use an
allocation interval of 50ms and decline offers with filters.
Review: https://reviews.apache.org/r/65059/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0e4d6f2f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0e4d6f2f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0e4d6f2f
Branch: refs/heads/master
Commit: 0e4d6f2ff8c0ebaa1a43e87ceab47108fd2eb31e
Parents: a0283a9
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:26 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:49:42 2018 -0800
----------------------------------------------------------------------
.../storage_local_resource_provider_tests.cpp | 212 +++++++++++--------
1 file changed, 126 insertions(+), 86 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0e4d6f2f/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index d5ec225..dfe4faf 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -40,7 +40,7 @@ using process::Future;
using process::Owned;
using testing::Args;
-using testing::AtMost;
+using testing::AtLeast;
using testing::Sequence;
namespace mesos {
@@ -600,16 +600,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
setupResourceProviderConfig(Gigabytes(4));
setupDiskProfileConfig();
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
- slave::Flags flags = CreateSlaveFlags();
- flags.isolation = "filesystem/linux";
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
// Disable HTTP authentication to simplify resource provider interactions.
- flags.authenticate_http_readwrite = false;
+ slaveFlags.authenticate_http_readwrite = false;
// Set the resource provider capability.
vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -617,17 +620,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
capabilities.push_back(capability);
- flags.agent_features = SlaveCapabilities();
- flags.agent_features->mutable_capabilities()->CopyFrom(
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
{capabilities.begin(), capabilities.end()});
- flags.resource_provider_config_dir = resourceProviderConfigDir;
- flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
@@ -640,10 +643,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
- // We use the filter explicitly here so that the resources will not
- // be filtered for 5 seconds (the default).
- Filters filters;
- filters.set_refuse_seconds(0);
+ // We use the following filter so that the resources will not be
+ // filtered for 5 seconds (the default).
+ Filters acceptFilters;
+ acceptFilters.set_refuse_seconds(0);
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
EXPECT_CALL(sched, registered(&driver, _, _));
@@ -671,7 +679,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillRepeatedly(DeclineOffers());
+ .WillRepeatedly(DeclineOffers(declineFilters));
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -708,7 +716,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
driver.acceptOffers(
{rawDiskOffers->at(0).id()},
{CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
- filters);
+ acceptFilters);
AWAIT_READY(volumeCreatedOffers);
ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -746,7 +754,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
driver.acceptOffers(
{volumeCreatedOffers->at(0).id()},
{DESTROY_VOLUME(volume.get())},
- filters);
+ acceptFilters);
AWAIT_READY(volumeDestroyedOffers);
ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -779,16 +787,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
setupResourceProviderConfig(Gigabytes(4));
setupDiskProfileConfig();
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
- slave::Flags flags = CreateSlaveFlags();
- flags.isolation = "filesystem/linux";
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
// Disable HTTP authentication to simplify resource provider interactions.
- flags.authenticate_http_readwrite = false;
+ slaveFlags.authenticate_http_readwrite = false;
// Set the resource provider capability.
vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -796,17 +807,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
capabilities.push_back(capability);
- flags.agent_features = SlaveCapabilities();
- flags.agent_features->mutable_capabilities()->CopyFrom(
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
{capabilities.begin(), capabilities.end()});
- flags.resource_provider_config_dir = resourceProviderConfigDir;
- flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
@@ -819,10 +830,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
- // We use the filter explicitly here so that the resources will not
- // be filtered for 5 seconds (the default).
- Filters filters;
- filters.set_refuse_seconds(0);
+ // We use the following filter so that the resources will not be
+ // filtered for 5 seconds (the default).
+ Filters acceptFilters;
+ acceptFilters.set_refuse_seconds(0);
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
EXPECT_CALL(sched, registered(&driver, _, _));
@@ -853,7 +869,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillRepeatedly(DeclineOffers());
+ .WillRepeatedly(DeclineOffers(declineFilters));
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -872,7 +888,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
.WillOnce(FutureArg<1>(&volumeDestroyedOffers));
EXPECT_CALL(sched, offerRescinded(_, _))
- .Times(AtMost(1));
+ .Times(AtLeast(1));
driver.start();
@@ -894,7 +910,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
driver.acceptOffers(
{rawDiskOffers->at(0).id()},
{CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
- filters);
+ acceptFilters);
AWAIT_READY(volumeCreatedOffers);
ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -931,7 +947,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
// Restart the agent.
slave.get()->terminate();
- slave = StartSlave(detector.get(), flags);
+ slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(agentRecoveredOffers);
@@ -941,7 +957,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
driver.acceptOffers(
{agentRecoveredOffers->at(0).id()},
{DESTROY_VOLUME(volume.get())},
- filters);
+ acceptFilters);
AWAIT_READY(volumeDestroyedOffers);
ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -975,16 +991,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
setupResourceProviderConfig(Gigabytes(4));
setupDiskProfileConfig();
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
- slave::Flags flags = CreateSlaveFlags();
- flags.isolation = "filesystem/linux";
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
// Disable HTTP authentication to simplify resource provider interactions.
- flags.authenticate_http_readwrite = false;
+ slaveFlags.authenticate_http_readwrite = false;
// Set the resource provider capability.
vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -992,17 +1011,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
capabilities.push_back(capability);
- flags.agent_features = SlaveCapabilities();
- flags.agent_features->mutable_capabilities()->CopyFrom(
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
{capabilities.begin(), capabilities.end()});
- flags.resource_provider_config_dir = resourceProviderConfigDir;
- flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
@@ -1015,10 +1034,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
- // We use the filter explicitly here so that the resources will not
- // be filtered for 5 seconds (the default).
- Filters filters;
- filters.set_refuse_seconds(0);
+ // We use the following filter so that the resources will not be
+ // filtered for 5 seconds (the default).
+ Filters acceptFilters;
+ acceptFilters.set_refuse_seconds(0);
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
EXPECT_CALL(sched, registered(&driver, _, _));
@@ -1051,7 +1075,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillRepeatedly(DeclineOffers());
+ .WillRepeatedly(DeclineOffers(declineFilters));
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -1083,7 +1107,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
driver.acceptOffers(
{rawDiskOffers->at(0).id()},
{CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
- filters);
+ acceptFilters);
AWAIT_READY(volumeCreatedOffers);
ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -1152,7 +1176,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
volumeCreatedOffers->at(0).slave_id(),
persistentVolume,
createCommandInfo("test -f " + path::join("volume", "file")))})},
- filters);
+ acceptFilters);
AWAIT_READY(taskStarting);
EXPECT_EQ(TASK_STARTING, taskStarting->state());
@@ -1174,7 +1198,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
{taskFinishedOffers->at(0).id()},
{DESTROY(persistentVolume),
DESTROY_VOLUME(volume.get())},
- filters);
+ acceptFilters);
AWAIT_READY(volumeDestroyedOffers);
ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -1193,16 +1217,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
setupResourceProviderConfig(Gigabytes(4));
setupDiskProfileConfig();
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
- slave::Flags flags = CreateSlaveFlags();
- flags.isolation = "filesystem/linux";
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
// Disable HTTP authentication to simplify resource provider interactions.
- flags.authenticate_http_readwrite = false;
+ slaveFlags.authenticate_http_readwrite = false;
// Set the resource provider capability.
vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -1210,17 +1237,17 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
capabilities.push_back(capability);
- flags.agent_features = SlaveCapabilities();
- flags.agent_features->mutable_capabilities()->CopyFrom(
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
{capabilities.begin(), capabilities.end()});
- flags.resource_provider_config_dir = resourceProviderConfigDir;
- flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
@@ -1233,10 +1260,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
- // We use the filter explicitly here so that the resources will not
- // be filtered for 5 seconds (the default).
- Filters filters;
- filters.set_refuse_seconds(0);
+ // We use the following filter so that the resources will not be
+ // filtered for 5 seconds (the default).
+ Filters acceptFilters;
+ acceptFilters.set_refuse_seconds(0);
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
EXPECT_CALL(sched, registered(&driver, _, _));
@@ -1270,7 +1302,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillRepeatedly(DeclineOffers());
+ .WillRepeatedly(DeclineOffers(declineFilters));
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
@@ -1278,7 +1310,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
.WillOnce(FutureArg<1>(&rawDiskOffers));
EXPECT_CALL(sched, offerRescinded(_, _))
- .Times(AtMost(1));
+ .Times(AtLeast(1));
driver.start();
@@ -1305,7 +1337,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
driver.acceptOffers(
{rawDiskOffers->at(0).id()},
{CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
- filters);
+ acceptFilters);
AWAIT_READY(volumeCreatedOffers);
ASSERT_FALSE(volumeCreatedOffers->empty());
@@ -1375,7 +1407,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
volumeCreatedOffers->at(0).slave_id(),
persistentVolume,
createCommandInfo("test -f " + path::join("volume", "file")))})},
- filters);
+ acceptFilters);
AWAIT_READY(taskStarting);
EXPECT_EQ(TASK_STARTING, taskStarting->state());
@@ -1391,7 +1423,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
// Restart the agent.
slave.get()->terminate();
- slave = StartSlave(detector.get(), flags);
+ slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(agentRecoveredOffers);
@@ -1406,7 +1438,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
{agentRecoveredOffers->at(0).id()},
{DESTROY(persistentVolume),
DESTROY_VOLUME(volume.get())},
- filters);
+ acceptFilters);
AWAIT_READY(volumeDestroyedOffers);
ASSERT_FALSE(volumeDestroyedOffers->empty());
@@ -1422,16 +1454,19 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
{
setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
- slave::Flags flags = CreateSlaveFlags();
- flags.isolation = "filesystem/linux";
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
// Disable HTTP authentication to simplify resource provider interactions.
- flags.authenticate_http_readwrite = false;
+ slaveFlags.authenticate_http_readwrite = false;
// Set the resource provider capability.
vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
@@ -1439,16 +1474,16 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
capabilities.push_back(capability);
- flags.agent_features = SlaveCapabilities();
- flags.agent_features->mutable_capabilities()->CopyFrom(
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
{capabilities.begin(), capabilities.end()});
- flags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
Future<SlaveRegisteredMessage> slaveRegisteredMessage =
FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
AWAIT_READY(slaveRegisteredMessage);
@@ -1461,10 +1496,15 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
MesosSchedulerDriver driver(
&sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
- // We use the filter explicitly here so that the resources will not
- // be filtered for 5 seconds (the default).
- Filters filters;
- filters.set_refuse_seconds(0);
+ // We use the following filter so that the resources will not be
+ // filtered for 5 seconds (the default).
+ Filters acceptFilters;
+ acceptFilters.set_refuse_seconds(0);
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
EXPECT_CALL(sched, registered(&driver, _, _));
@@ -1490,7 +1530,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
// Decline offers that contain only the agent's default resources.
EXPECT_CALL(sched, resourceOffers(&driver, _))
- .WillRepeatedly(DeclineOffers());
+ .WillRepeatedly(DeclineOffers(declineFilters));
EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
isPreExistingVolume)))
@@ -1519,7 +1559,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
{rawDisksOffers->at(0).id()},
{CREATE_VOLUME(sources.at(0), Resource::DiskInfo::Source::MOUNT),
CREATE_BLOCK(sources.at(1))},
- filters);
+ acceptFilters);
AWAIT_READY(disksConvertedOffers);
ASSERT_FALSE(disksConvertedOffers->empty());
@@ -1551,7 +1591,7 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
{disksConvertedOffers->at(0).id()},
{DESTROY_VOLUME(volume.get()),
DESTROY_BLOCK(block.get())},
- filters);
+ acceptFilters);
AWAIT_READY(disksRevertedOffers);
ASSERT_FALSE(disksRevertedOffers->empty());
[7/7] mesos git commit: Tested that agent resends unacknowledged
operation updates on recovery.
Posted by gr...@apache.org.
Tested that agent resends unacknowledged operation updates on recovery.
This patch adds a test to verify that the agent resends unacknowledged
operation status updates after a recovery.
Review: https://reviews.apache.org/r/65182/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/336e9321
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/336e9321
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/336e9321
Branch: refs/heads/master
Commit: 336e932199643e88c0edbea7c1f08d4b45596389
Parents: 60f23d8
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Fri Jan 19 15:36:32 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:32 2018 -0800
----------------------------------------------------------------------
.../storage_local_resource_provider_tests.cpp | 190 +++++++++++++++++++
1 file changed, 190 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/336e9321/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index f6d093a..5ac9180 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -2077,6 +2077,196 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_RetryOperationStatusUpdate)
driver.join();
}
+
+// This test verifies that on agent restarts, unacknowledged operation status
+// updates are resent to the master
+//
+// To accomplish this:
+// 1. Creates a volume from a RAW disk resource.
+// 2. Drops the first `UpdateOperationStatusMessage` from the agent to the
+// master, so that it isn't acknowledged by the master.
+// 3. Restarts the agent.
+// 4. Verifies that the agent resends the operation status update.
+TEST_F(
+ StorageLocalResourceProviderTest,
+ ROOT_RetryOperationStatusUpdateAfterRecovery)
+{
+ Clock::pause();
+
+ loadUriDiskProfileModule();
+
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileConfig();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags flags = CreateSlaveFlags();
+ flags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ flags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ flags.agent_features = SlaveCapabilities();
+ flags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ flags.resource_provider_config_dir = resourceProviderConfigDir;
+ flags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ //
+ // NOTE: The order of the two `FUTURE_PROTOBUF`s are reversed because
+ // Google Mock will search the expectations in reverse order.
+ Future<UpdateSlaveMessage> updateSlave2 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave1 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(flags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave1);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // periodically check if the CSI endpoint socket has been created by
+ // the plugin container, which runs in another Linux process.
+ Clock::resume();
+
+ AWAIT_READY(updateSlave2);
+ ASSERT_TRUE(updateSlave2->has_resource_providers());
+ ASSERT_EQ(1, updateSlave2->resource_providers().providers_size());
+
+ Clock::pause();
+
+ // Register a framework to exercise an operation.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ Future<vector<Offer>> offers;
+
+ auto isRaw = [](
+ const Resource& r) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().has_profile() &&
+ r.disk().source().type() == Resource::DiskInfo::Source::RAW;
+ };
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(isRaw, lambda::_1))))
+ .WillOnce(FutureArg<1>(&offers));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->empty());
+
+ const Offer& offer = offers->at(0);
+
+ Option<Resource> source;
+ foreach (const Resource& resource, offer.resources()) {
+ if (isRaw(resource)) {
+ source = resource;
+ break;
+ }
+ }
+ ASSERT_SOME(source);
+
+ // We'll drop the first operation status update from the agent to the master.
+ Future<UpdateOperationStatusMessage> droppedUpdateOperationStatusMessage =
+ DROP_PROTOBUF(
+ UpdateOperationStatusMessage(), slave.get()->pid, master.get()->pid);
+
+ // Create a volume.
+ driver.acceptOffers(
+ {offer.id()},
+ {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+ {});
+
+ AWAIT_READY(droppedUpdateOperationStatusMessage);
+
+ // Restart the agent.
+ slave.get()->terminate();
+
+ // Since the local resource provider daemon is started after the agent
+ // is registered, it is guaranteed that the slave will send two
+ // `UpdateSlaveMessage`s, where the latter one contains resources from
+ // the storage local resource provider.
+ Future<UpdateSlaveMessage> updateSlave4 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+ Future<UpdateSlaveMessage> updateSlave3 =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ // Once the agent is restarted, the SLRP should resend the dropped operation
+ // status update.
+ Future<UpdateOperationStatusMessage> retriedUpdateOperationStatusMessage =
+ FUTURE_PROTOBUF(UpdateOperationStatusMessage(), _, master.get()->pid);
+
+ // The master should acknowledge the operation status update once.
+ Future<AcknowledgeOperationStatusMessage> acknowledgeOperationStatusMessage =
+ FUTURE_PROTOBUF(AcknowledgeOperationStatusMessage(), master.get()->pid, _);
+
+ // Decline offers without RAW disk resources, the master can send such offers
+ // once it receives the first `UpdateSlaveMessage` after the agent failover,
+ // or after receiving the `UpdateOperationStatusMessage`.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers());
+
+ slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(flags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlave3);
+
+ // Resume the clock so that the CSI plugin's standalone container is created
+ // and the SLRP's async loop notices it.
+ Clock::resume();
+
+ AWAIT_READY(updateSlave4);
+ ASSERT_TRUE(updateSlave4->has_resource_providers());
+ ASSERT_EQ(1, updateSlave4->resource_providers().providers_size());
+
+ Clock::pause();
+
+ AWAIT_READY(retriedUpdateOperationStatusMessage);
+
+ AWAIT_READY(acknowledgeOperationStatusMessage);
+
+ // The master has acknowledged the operation status update, so the SLRP
+ // shouldn't send further operation status updates.
+ EXPECT_NO_FUTURE_PROTOBUFS(UpdateOperationStatusMessage(), _, _);
+
+ Clock::advance(slave::STATUS_UPDATE_RETRY_INTERVAL_MIN);
+ Clock::settle();
+
+ driver.stop();
+ driver.join();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[4/7] mesos git commit: Added a storage local resource provider test
for CSI plugin restart.
Posted by gr...@apache.org.
Added a storage local resource provider test for CSI plugin restart.
The test does the same as the `PublishResources` test, but it kills the
CSI plugin container between each operation.
Review: https://reviews.apache.org/r/64998/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69e5e28e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69e5e28e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69e5e28e
Branch: refs/heads/master
Commit: 69e5e28ed0756f94c839a453052d268696d66a33
Parents: 0e4d6f2
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:27 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:08 2018 -0800
----------------------------------------------------------------------
src/Makefile.am | 1 +
src/slave/container_daemon.cpp | 47 +--
src/slave/container_daemon_process.hpp | 82 ++++++
.../storage_local_resource_provider_tests.cpp | 290 +++++++++++++++++++
4 files changed, 375 insertions(+), 45 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/Makefile.am
----------------------------------------------------------------------
diff --git a/src/Makefile.am b/src/Makefile.am
index 191594b..fe8f689 100644
--- a/src/Makefile.am
+++ b/src/Makefile.am
@@ -1190,6 +1190,7 @@ libmesos_no_3rdparty_la_SOURCES += \
slave/compatibility.hpp \
slave/constants.hpp \
slave/container_daemon.hpp \
+ slave/container_daemon_process.hpp \
slave/flags.hpp \
slave/gc.hpp \
slave/gc_process.hpp \
http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon.cpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon.cpp b/src/slave/container_daemon.cpp
index d74fa51..6458d1f 100644
--- a/src/slave/container_daemon.cpp
+++ b/src/slave/container_daemon.cpp
@@ -16,20 +16,17 @@
#include "slave/container_daemon.hpp"
-#include <mesos/agent/agent.hpp>
-
#include <process/defer.hpp>
#include <process/id.hpp>
-#include <process/process.hpp>
#include <stout/lambda.hpp>
#include <stout/stringify.hpp>
#include <stout/unreachable.hpp>
-#include "common/http.hpp"
-
#include "internal/evolve.hpp"
+#include "slave/container_daemon_process.hpp"
+
namespace http = process::http;
using std::string;
@@ -64,46 +61,6 @@ static inline http::Headers getAuthHeader(const Option<string>& authToken)
}
-class ContainerDaemonProcess : public Process<ContainerDaemonProcess>
-{
-public:
- explicit ContainerDaemonProcess(
- const http::URL& _agentUrl,
- const Option<string>& _authToken,
- const ContainerID& containerId,
- const Option<CommandInfo>& commandInfo,
- const Option<Resources>& resources,
- const Option<ContainerInfo>& containerInfo,
- const Option<std::function<Future<Nothing>()>>& _postStartHook,
- const Option<std::function<Future<Nothing>()>>& _postStopHook);
-
- ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete;
-
- ContainerDaemonProcess& operator=(
- const ContainerDaemonProcess& other) = delete;
-
- Future<Nothing> wait();
-
-protected:
- void initialize() override;
-
-private:
- void launchContainer();
- void waitContainer();
-
- const http::URL agentUrl;
- const Option<string> authToken;
- const ContentType contentType;
- const Option<std::function<Future<Nothing>()>> postStartHook;
- const Option<std::function<Future<Nothing>()>> postStopHook;
-
- Call launchCall;
- Call waitCall;
-
- Promise<Nothing> terminated;
-};
-
-
ContainerDaemonProcess::ContainerDaemonProcess(
const http::URL& _agentUrl,
const Option<string>& _authToken,
http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/slave/container_daemon_process.hpp
----------------------------------------------------------------------
diff --git a/src/slave/container_daemon_process.hpp b/src/slave/container_daemon_process.hpp
new file mode 100644
index 0000000..a5d19a0
--- /dev/null
+++ b/src/slave/container_daemon_process.hpp
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#ifndef __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__
+#define __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__
+
+#include <functional>
+#include <string>
+
+#include <mesos/mesos.hpp>
+#include <mesos/agent/agent.hpp>
+
+#include <process/future.hpp>
+#include <process/http.hpp>
+#include <process/process.hpp>
+
+#include <stout/option.hpp>
+
+#include "common/http.hpp"
+
+namespace mesos {
+namespace internal {
+namespace slave {
+
+class ContainerDaemonProcess : public process::Process<ContainerDaemonProcess>
+{
+public:
+ explicit ContainerDaemonProcess(
+ const process::http::URL& _agentUrl,
+ const Option<std::string>& _authToken,
+ const ContainerID& containerId,
+ const Option<CommandInfo>& commandInfo,
+ const Option<Resources>& resources,
+ const Option<ContainerInfo>& containerInfo,
+ const Option<std::function<process::Future<Nothing>()>>& _postStartHook,
+ const Option<std::function<process::Future<Nothing>()>>& _postStopHook);
+
+ ContainerDaemonProcess(const ContainerDaemonProcess& other) = delete;
+
+ ContainerDaemonProcess& operator=(
+ const ContainerDaemonProcess& other) = delete;
+
+ process::Future<Nothing> wait();
+
+ // Made public for testing purpose.
+ void launchContainer();
+ void waitContainer();
+
+protected:
+ void initialize() override;
+
+private:
+ const process::http::URL agentUrl;
+ const Option<std::string> authToken;
+ const ContentType contentType;
+ const Option<std::function<process::Future<Nothing>()>> postStartHook;
+ const Option<std::function<process::Future<Nothing>()>> postStopHook;
+
+ agent::Call launchCall;
+ agent::Call waitCall;
+
+ process::Promise<Nothing> terminated;
+};
+
+} // namespace slave {
+} // namespace internal {
+} // namespace mesos {
+
+#endif // __SLAVE_CONTAINER_DAEMON_PROCESS_HPP__
http://git-wip-us.apache.org/repos/asf/mesos/blob/69e5e28e/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index dfe4faf..1b21527 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -24,6 +24,12 @@
#include "module/manager.hpp"
+#include "slave/container_daemon_process.hpp"
+
+#include "slave/containerizer/fetcher.hpp"
+
+#include "slave/containerizer/mesos/containerizer.hpp"
+
#include "tests/flags.hpp"
#include "tests/mesos.hpp"
@@ -31,6 +37,8 @@ using std::shared_ptr;
using std::string;
using std::vector;
+using mesos::internal::slave::ContainerDaemonProcess;
+
using mesos::master::detector::MasterDetector;
using mesos::v1::resource_provider::Call;
@@ -1449,6 +1457,288 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
// This test verifies that the storage local resource provider can
+// restart its CSI plugin after it is killed and continue to work
+// properly.
+TEST_F(
+ StorageLocalResourceProviderTest,
+ ROOT_PublishUnpublishResourcesPluginKilled)
+{
+ loadUriDiskProfileModule();
+
+ setupResourceProviderConfig(Gigabytes(4));
+ setupDiskProfileConfig();
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.isolation = "filesystem/linux";
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ slaveFlags.resource_provider_config_dir = resourceProviderConfigDir;
+ slaveFlags.disk_profile_adaptor = URI_DISK_PROFILE_ADAPTOR_NAME;
+
+ slave::Fetcher fetcher(slaveFlags);
+
+ Try<slave::MesosContainerizer*> _containerizer =
+ slave::MesosContainerizer::create(slaveFlags, false, &fetcher);
+ ASSERT_SOME(_containerizer);
+
+ Owned<slave::MesosContainerizer> containerizer(_containerizer.get());
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave =
+ StartSlave(detector.get(), containerizer.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // Register a framework to exercise operations.
+ FrameworkInfo framework = DEFAULT_FRAMEWORK_INFO;
+ framework.set_roles(0, "storage");
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, framework, master.get()->pid, DEFAULT_CREDENTIAL);
+
+ // We use the following filter so that the resources will not be
+ // filtered for 5 seconds (the default).
+ Filters acceptFilters;
+ acceptFilters.set_refuse_seconds(0);
+
+ // We use the following filter to filter offers that do not have
+ // wanted resources for 365 days (the maximum).
+ Filters declineFilters;
+ declineFilters.set_refuse_seconds(Days(365).secs());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // The framework is expected to see the following offers in sequence:
+ // 1. One containing a RAW disk resource before `CREATE_VOLUME`.
+ // 2. One containing a MOUNT disk resource after `CREATE_VOLUME`.
+ // 3. One containing the same MOUNT disk resource after `CREATE`,
+ // `LAUNCH` and `DESTROY`.
+ // 4. One containing the same RAW disk resource after `DESTROY_VOLUME`.
+ //
+ // We set up the expectations for these offers as the test progresses.
+ Future<vector<Offer>> rawDiskOffers;
+ Future<vector<Offer>> volumeCreatedOffers;
+ Future<vector<Offer>> taskFinishedOffers;
+ Future<vector<Offer>> volumeDestroyedOffers;
+
+ Sequence offers;
+
+ // We are only interested in storage pools and volume created from
+ // them, which have a "volume-default" profile.
+ auto hasSourceType = [](
+ const Resource& r,
+ const Resource::DiskInfo::Source::Type& type) {
+ return r.has_disk() &&
+ r.disk().has_source() &&
+ r.disk().source().has_profile() &&
+ r.disk().source().profile() == "volume-default" &&
+ r.disk().source().type() == type;
+ };
+
+ // Decline offers that contain only the agent's default resources.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillRepeatedly(DeclineOffers(declineFilters));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::RAW))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&rawDiskOffers));
+
+ driver.start();
+
+ AWAIT_READY(rawDiskOffers);
+ ASSERT_FALSE(rawDiskOffers->empty());
+
+ Option<Resource> source;
+
+ foreach (const Resource& resource, rawDiskOffers->at(0).resources()) {
+ if (hasSourceType(resource, Resource::DiskInfo::Source::RAW)) {
+ source = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(source);
+
+ // Get the ID of the CSI plugin container.
+ Future<hashset<ContainerID>> pluginContainers = containerizer->containers();
+
+ AWAIT_READY(pluginContainers);
+ ASSERT_EQ(1u, pluginContainers->size());
+
+ const ContainerID& pluginContainerId = *pluginContainers->begin();
+
+ Future<Nothing> pluginRestarted =
+ FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+ // Kill the plugin container and wait for it to restart.
+ Future<int> pluginKilled = containerizer->status(pluginContainerId)
+ .then([](const ContainerStatus& status) {
+ return os::kill(status.executor_pid(), SIGKILL);
+ });
+
+ AWAIT_ASSERT_EQ(0, pluginKilled);
+ AWAIT_READY(pluginRestarted);
+
+ // Create a volume.
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveAnyResource(
+ std::bind(hasSourceType, lambda::_1, Resource::DiskInfo::Source::MOUNT))))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&volumeCreatedOffers));
+
+ driver.acceptOffers(
+ {rawDiskOffers->at(0).id()},
+ {CREATE_VOLUME(source.get(), Resource::DiskInfo::Source::MOUNT)},
+ acceptFilters);
+
+ AWAIT_READY(volumeCreatedOffers);
+ ASSERT_FALSE(volumeCreatedOffers->empty());
+
+ Option<Resource> volume;
+
+ foreach (const Resource& resource, volumeCreatedOffers->at(0).resources()) {
+ if (hasSourceType(resource, Resource::DiskInfo::Source::MOUNT)) {
+ volume = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(volume);
+ ASSERT_TRUE(volume->disk().source().has_id());
+ ASSERT_TRUE(volume->disk().source().has_metadata());
+ ASSERT_TRUE(volume->disk().source().has_mount());
+ ASSERT_TRUE(volume->disk().source().mount().has_root());
+ EXPECT_FALSE(path::absolute(volume->disk().source().mount().root()));
+
+ // Check if the volume is actually created by the test CSI plugin.
+ Option<string> volumePath;
+
+ foreach (const Label& label, volume->disk().source().metadata().labels()) {
+ if (label.key() == "path") {
+ volumePath = label.value();
+ break;
+ }
+ }
+
+ ASSERT_SOME(volumePath);
+ EXPECT_TRUE(os::exists(volumePath.get()));
+
+ pluginRestarted =
+ FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+ // Kill the plugin container and wait for it to restart.
+ pluginKilled = containerizer->status(pluginContainerId)
+ .then([](const ContainerStatus& status) {
+ return os::kill(status.executor_pid(), SIGKILL);
+ });
+
+ AWAIT_ASSERT_EQ(0, pluginKilled);
+ AWAIT_READY(pluginRestarted);
+
+ // Put a file into the volume.
+ ASSERT_SOME(os::touch(path::join(volumePath.get(), "file")));
+
+ // Create a persistent volume on the CSI volume, then launch a task to
+ // use the persistent volume.
+ Resource persistentVolume = volume.get();
+ persistentVolume.mutable_disk()->mutable_persistence()
+ ->set_id(id::UUID::random().toString());
+ persistentVolume.mutable_disk()->mutable_persistence()
+ ->set_principal(framework.principal());
+ persistentVolume.mutable_disk()->mutable_volume()
+ ->set_container_path("volume");
+ persistentVolume.mutable_disk()->mutable_volume()->set_mode(Volume::RW);
+
+ Future<TaskStatus> taskStarting;
+ Future<TaskStatus> taskRunning;
+ Future<TaskStatus> taskFinished;
+
+ EXPECT_CALL(sched, statusUpdate(&driver, _))
+ .WillOnce(FutureArg<1>(&taskStarting))
+ .WillOnce(FutureArg<1>(&taskRunning))
+ .WillOnce(FutureArg<1>(&taskFinished));
+
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(
+ persistentVolume)))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&taskFinishedOffers));
+
+ driver.acceptOffers(
+ {volumeCreatedOffers->at(0).id()},
+ {CREATE(persistentVolume),
+ LAUNCH({createTask(
+ volumeCreatedOffers->at(0).slave_id(),
+ persistentVolume,
+ createCommandInfo("test -f " + path::join("volume", "file")))})},
+ acceptFilters);
+
+ AWAIT_READY(taskStarting);
+ EXPECT_EQ(TASK_STARTING, taskStarting->state());
+
+ AWAIT_READY(taskRunning);
+ EXPECT_EQ(TASK_RUNNING, taskRunning->state());
+
+ AWAIT_READY(taskFinished);
+ EXPECT_EQ(TASK_FINISHED, taskFinished->state());
+
+ AWAIT_READY(taskFinishedOffers);
+
+ pluginRestarted =
+ FUTURE_DISPATCH(_, &ContainerDaemonProcess::launchContainer);
+
+ // Kill the plugin container and wait for it to restart.
+ pluginKilled = containerizer->status(pluginContainerId)
+ .then([](const ContainerStatus& status) {
+ return os::kill(status.executor_pid(), SIGKILL);
+ });
+
+ AWAIT_ASSERT_EQ(0, pluginKilled);
+ AWAIT_READY(pluginRestarted);
+
+ // Destroy the persistent volume and the CSI volume.
+ EXPECT_CALL(sched, resourceOffers(&driver, OffersHaveResource(source.get())))
+ .InSequence(offers)
+ .WillOnce(FutureArg<1>(&volumeDestroyedOffers));
+
+ driver.acceptOffers(
+ {taskFinishedOffers->at(0).id()},
+ {DESTROY(persistentVolume),
+ DESTROY_VOLUME(volume.get())},
+ acceptFilters);
+
+ AWAIT_READY(volumeDestroyedOffers);
+ ASSERT_FALSE(volumeDestroyedOffers->empty());
+
+ // Check if the volume is actually deleted by the test CSI plugin.
+ EXPECT_FALSE(os::exists(volumePath.get()));
+}
+
+
+// This test verifies that the storage local resource provider can
// convert pre-existing CSI volumes into mount or block volumes.
TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
{
[2/7] mesos git commit: Renamed storage local resource provider tests
to describe them better.
Posted by gr...@apache.org.
Renamed storage local resource provider tests to describe them better.
Review: https://reviews.apache.org/r/64994/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a0283a9f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a0283a9f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a0283a9f
Branch: refs/heads/master
Commit: a0283a9f29354088e51f837ad61d776434070f3a
Parents: 4924443
Author: Chun-Hung Hsiao <ch...@mesosphere.io>
Authored: Fri Jan 19 15:36:24 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:49:03 2018 -0800
----------------------------------------------------------------------
.../storage_local_resource_provider_tests.cpp | 38 +++++++++-----------
1 file changed, 17 insertions(+), 21 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a0283a9f/src/tests/storage_local_resource_provider_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/storage_local_resource_provider_tests.cpp b/src/tests/storage_local_resource_provider_tests.cpp
index d98b914..d5ec225 100644
--- a/src/tests/storage_local_resource_provider_tests.cpp
+++ b/src/tests/storage_local_resource_provider_tests.cpp
@@ -487,8 +487,8 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_SmallDisk)
// This test verifies that a framework can receive offers having new
-// storage pools from the storage local resource provider due to
-// adding new profiles.
+// storage pools from the storage local resource provider after a new
+// profile appears.
TEST_F(StorageLocalResourceProviderTest, ROOT_NewProfile)
{
Clock::pause();
@@ -591,10 +591,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewProfile)
}
-// This test verifies that a framework can create then destroy a new
-// volume from the storage pool of a storage local resource provider
-// that uses the test CSI plugin.
-TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
+// This test verifies that the storage local resource provider can
+// create then destroy a new volume from a storage pool.
+TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolume)
{
loadUriDiskProfileModule();
@@ -771,10 +770,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolume)
}
-// This test verifies that a framework can destroy a new volume created
-// from the storage pool of a storage local resource provider that uses
-// the test CSI plugin after recovery.
-TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
+// This test verifies that the storage local resource provider can
+// destroy a volume created from a storage pool after recovery.
+TEST_F(StorageLocalResourceProviderTest, ROOT_CreateDestroyVolumeRecovery)
{
loadUriDiskProfileModule();
@@ -967,10 +965,10 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_NewVolumeRecovery)
}
-// This test verifies that a framework can launch a task using a created
-// volume from a storage local resource provider that uses the test CSI
-// plugin, then destroy the volume while it is published.
-TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
+// This test verifies that the storage local resource provider can
+// publish a volume required by a task, then destroy the published
+// volume after the task finishes.
+TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResources)
{
loadUriDiskProfileModule();
@@ -1186,10 +1184,9 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTask)
}
-// This test verifies that a framework can destroy a volume that was
-// created from a storage pool of a storage local resource provider
-// and used by a task after recovery.
-TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
+// This test verifies that the storage local resource provider can
+// destroy a published volume after recovery.
+TEST_F(StorageLocalResourceProviderTest, ROOT_PublishResourcesRecovery)
{
loadUriDiskProfileModule();
@@ -1419,9 +1416,8 @@ TEST_F(StorageLocalResourceProviderTest, ROOT_LaunchTaskRecovery)
}
-// This test verifies that a framework can convert pre-existing volumes
-// from a storage local resource provider that uses the test CSI plugin
-// into mount or block volumes.
+// This test verifies that the storage local resource provider can
+// convert pre-existing CSI volumes into mount or block volumes.
TEST_F(StorageLocalResourceProviderTest, ROOT_ConvertPreExistingVolume)
{
setupResourceProviderConfig(Bytes(0), "volume1:2GB;volume2:2GB");
[5/7] mesos git commit: Resumed the clock if necessary when
destroying test agent.
Posted by gr...@apache.org.
Resumed the clock if necessary when destroying test agent.
Because the `cgroups::destroy()` code path makes use of `delay()`,
the clock must not be paused in order for the destructor of the
test `Slave` to reliably destroy all remaining containers.
This patch updates the destructor to check if the clock is paused
and, if so, resume it before destroying containers.
Review: https://reviews.apache.org/r/65232/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/434ef5f4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/434ef5f4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/434ef5f4
Branch: refs/heads/master
Commit: 434ef5f431d62113d649e7a7c946c55d43a8034a
Parents: 69e5e28
Author: Greg Mann <gr...@mesosphere.io>
Authored: Fri Jan 19 15:36:29 2018 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Fri Jan 19 15:50:32 2018 -0800
----------------------------------------------------------------------
src/tests/cluster.cpp | 14 ++++++++++++++
1 file changed, 14 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/434ef5f4/src/tests/cluster.cpp
----------------------------------------------------------------------
diff --git a/src/tests/cluster.cpp b/src/tests/cluster.cpp
index 568c9c7..19a41c7 100644
--- a/src/tests/cluster.cpp
+++ b/src/tests/cluster.cpp
@@ -641,6 +641,16 @@ Slave::~Slave()
AWAIT_READY(containers);
+ // Because the `cgroups::destroy()` code path makes use of `delay()`, the
+ // clock must not be paused in order to reliably destroy all remaining
+ // containers. If necessary, we resume the clock here and then pause it
+ // again when we're done destroying containers.
+ bool paused = process::Clock::paused();
+
+ if (paused) {
+ process::Clock::resume();
+ }
+
foreach (const ContainerID& containerId, containers.get()) {
process::Future<Option<ContainerTermination>> wait =
containerizer->wait(containerId);
@@ -656,6 +666,10 @@ Slave::~Slave()
}
}
+ if (paused) {
+ process::Clock::pause();
+ }
+
containers = containerizer->containers();
AWAIT_READY(containers);