You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/13 20:04:29 UTC
[1/2] mesos git commit: Persisted the reservation state on the slave.
Repository: mesos
Updated Branches:
refs/heads/master 860bb0fc3 -> 9db83273e
Persisted the reservation state on the slave.
Review: https://reviews.apache.org/r/32398
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/96e203f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/96e203f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/96e203f7
Branch: refs/heads/master
Commit: 96e203f75b1529586768d512f7ce09d8034fa02e
Parents: 860bb0f
Author: Michael Park <mc...@gmail.com>
Authored: Wed May 13 10:43:06 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 10:54:22 2015 -0700
----------------------------------------------------------------------
include/mesos/resources.hpp | 3 +
src/common/resources.cpp | 6 +
src/common/resources_utils.cpp | 42 ++-
src/tests/reservation_tests.cpp | 705 +++++++++++++++++++++++++++++++++--
4 files changed, 709 insertions(+), 47 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index 4c036d3..f44cb62 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -114,6 +114,9 @@ public:
// Tests if the given Resource object is unreserved.
static bool isUnreserved(const Resource& resource);
+ // Tests if the given Resource object is dynamically reserved.
+ static bool isDynamicallyReserved(const Resource& resource);
+
// Returns the summed up Resources given a hashmap<Key, Resources>.
//
// NOTE: While scalar resources such as "cpus" sum correctly,
http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index 235930f..843a06d 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -502,6 +502,12 @@ bool Resources::isUnreserved(const Resource& resource)
}
+bool Resources::isDynamicallyReserved(const Resource& resource)
+{
+ return resource.has_reservation();
+}
+
+
/////////////////////////////////////////////////
// Public member functions.
/////////////////////////////////////////////////
http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index fe04d57..8d6aaaa 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -26,11 +26,21 @@ namespace mesos {
bool needCheckpointing(const Resource& resource)
{
- // TODO(mpark): Consider framework reservations.
- return Resources::isPersistentVolume(resource);
+ return Resources::isDynamicallyReserved(resource) ||
+ Resources::isPersistentVolume(resource);
}
+// NOTE: We effectively duplicate the logic in 'Resources::apply'
+// which is less than ideal. But we can not simply create
+// 'Offer::Operation' and invoke 'Resources::apply' here.
+// 'RESERVE' operation requires that the specified resources are
+// dynamically reserved only, and 'CREATE' requires that the
+// specified resources are already dynamically reserved.
+// These requirements are violated when we try to infer dynamically
+// reserved persistent volumes.
+// TODO(mpark): Consider introducing an atomic 'RESERVE_AND_CREATE'
+// operation to solve this problem.
Try<Resources> applyCheckpointedResources(
const Resources& resources,
const Resources& checkpointedResources)
@@ -42,23 +52,25 @@ Try<Resources> applyCheckpointedResources(
return Error("Unexpected checkpointed resources " + stringify(resource));
}
- // TODO(jieyu): Apply RESERVE operation if 'resource' is
- // dynamically reserved.
+ Resource stripped = resource;
- if (Resources::isPersistentVolume(resource)) {
- Offer::Operation create;
- create.set_type(Offer::Operation::CREATE);
- create.mutable_create()->add_volumes()->CopyFrom(resource);
+ if (Resources::isDynamicallyReserved(resource)) {
+ stripped.set_role("*");
+ stripped.clear_reservation();
+ }
- Try<Resources> applied = totalResources.apply(create);
- if (applied.isError()) {
- return Error(
- "Cannot find transition for checkpointed resource " +
- stringify(resource));
- }
+ if (Resources::isPersistentVolume(resource)) {
+ stripped.clear_disk();
+ }
- totalResources = applied.get();
+ if (!totalResources.contains(stripped)) {
+ return Error(
+ "Incompatible slave resources: " + stringify(totalResources) +
+ " does not contain " + stringify(stripped));
}
+
+ totalResources -= stripped;
+ totalResources += resource;
}
return totalResources;
http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index c0edf3c..9020d04 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -102,8 +102,8 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
MesosSchedulerDriver driver(
&sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
- // We use the filter explicitly here so that the resources
- // will not be filtered for 5 seconds (by default).
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (by default).
Filters filters;
filters.set_refuse_seconds(0);
@@ -122,7 +122,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
driver.start();
- // In the first offer, expect an offer with the unreserved resources.
+ // In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -137,7 +137,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
// Reserve the resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
- // In the next offer, expect an offer with the reserved resources.
+ // In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -152,7 +152,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
// Unreserve the resources.
driver.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
- // In the next offer, expect an offer with the unreserved resources.
+ // In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -167,10 +167,10 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
}
-// This tests that a framework can send back a Reserve followed by
-// a LaunchTasks offer operation as a response to an offer, which
-// updates the resources in the allocator then proceeds to launch
-// the task with the reserved resources. The reserved resources are
+// This tests that a framework can send back a Reserve followed by a
+// LaunchTasks offer operation as a response to an offer, which
+// updates the resources in the allocator then proceeds to launch the
+// task with the reserved resources. The reserved resources are
// reoffered to the framework on task completion. The framework then
// sends back an Unreserved offer operation to unreserve the reserved
// resources. We test that the framework receives the unreserved
@@ -214,7 +214,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
driver.start();
- // In the first offer, expect an offer with the unreserved resources.
+ // In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -244,7 +244,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
{RESERVE(dynamicallyReserved),
LAUNCH({taskInfo})});
- // In the next offer, expect an offer with the reserved resources.
+ // In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -256,15 +256,15 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
EXPECT_CALL(sched, resourceOffers(&driver, _))
.WillOnce(FutureArg<1>(&offers));
- // We use the filter explicitly here so that the resources
- // will not be filtered for 5 seconds (by default).
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (by default).
Filters filters;
filters.set_refuse_seconds(0);
// Unreserve the resources.
driver.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
- // In the next offer, expect an offer with the unreserved resources.
+ // In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -280,11 +280,11 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
// This test launches 2 frameworks in the same role. framework1
-// reserves resources by sending back a Reserve offer operation.
-// We first test that framework1 receives the reserved resources,
-// then on the next resource offer, framework1 declines the offer.
-// This should lead to framework2 receiving the resources that
-// framework1 reserved.
+// reserves resources by sending back a Reserve offer operation. We
+// first test that framework1 receives the reserved resources, then on
+// the next resource offer, framework1 declines the offer. This
+// should lead to framework2 receiving the resources that framework1
+// reserved.
TEST_F(ReservationTest, ReserveShareWithinRole)
{
string role = "role";
@@ -333,7 +333,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
driver1.start();
- // In the first offer, expect an offer with the unreserved resources.
+ // In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -345,15 +345,15 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
EXPECT_CALL(sched1, resourceOffers(&driver1, _))
.WillOnce(FutureArg<1>(&offers));
- // We use the filter explicitly here so that the resources
- // will not be filtered for 5 seconds (by default).
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (by default).
Filters filters;
filters.set_refuse_seconds(0);
// Reserve the resources.
driver1.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
- // In the next offer, expect an offer with the reserved resources.
+ // In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -445,7 +445,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
driver.start();
- // In the first offer, expect an offer with the unreserved resources.
+ // In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -461,8 +461,8 @@ TEST_F(ReservationTest, DropReserveTooLarge)
EXPECT_CALL(allocator, updateAllocation(_, _, _))
.Times(0);
- // We use the filter explicitly here so that the resources
- // will not be filtered for 5 seconds (by default).
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (by default).
Filters filters;
filters.set_refuse_seconds(0);
@@ -488,8 +488,8 @@ TEST_F(ReservationTest, DropReserveTooLarge)
}
-// This tests that an attempt to dynamically reserve statically reserved
-// resources is dropped.
+// This tests that an attempt to dynamically reserve statically
+// reserved resources is dropped.
TEST_F(ReservationTest, DropReserveStaticReservation)
{
TestAllocator<> allocator;
@@ -536,8 +536,8 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
driver.start();
- // In the first offer, expect an offer with the statically
- // reserved resources.
+ // In the first offer, expect an offer with the statically reserved
+ // resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -561,8 +561,8 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
// Attempt to reserve the statically reserved resources.
driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
- // In the next offer, still expect an offer with the statically reserved
- // resources.
+ // In the next offer, still expect an offer with the statically
+ // reserved resources.
AWAIT_READY(offers);
ASSERT_EQ(offers.get().size(), 1);
@@ -576,6 +576,647 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
Shutdown();
}
+
+// This test verifies that CheckpointResourcesMessages are sent to the
+// slave when a framework reserve/unreserves resources, and the
+// resources in the messages correctly reflect the resources that need
+// to be checkpointed on the slave.
+TEST_F(ReservationTest, SendingCheckpointResourcesMessage)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+ masterFlags.roles = frameworkInfo.role();
+
+ Try<PID<Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.resources = "cpus:8;mem:4096";
+
+ Try<PID<Slave>> slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Resources unreserved1 = Resources::parse("cpus:8").get();
+ Resources reserved1 = unreserved1.flatten(
+ frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+ Resources unreserved2 = Resources::parse("mem:2048").get();
+ Resources reserved2 = unreserved2.flatten(
+ frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+ // We use this to capture offers from 'resourceOffers'.
+ Future<vector<Offer>> offers;
+
+ // The expectation for the first offer.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ // In the first offer, expect the sum of 'unreserved1' and
+ // 'unreserved2'.
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(offers.get().size(), 1u);
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved1 + unreserved2));
+
+ Future<CheckpointResourcesMessage> message3 =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ Future<CheckpointResourcesMessage> message2 =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ Future<CheckpointResourcesMessage> message1 =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ // We use the filter explicitly here so that the resources will not
+ // be filtered for 5 seconds (by default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // Attempt to reserve and unreserve resources.
+ driver.acceptOffers(
+ {offer.id()},
+ {RESERVE(reserved1), RESERVE(reserved2), UNRESERVE(reserved1)},
+ filters);
+
+ // NOTE: Currently, we send one message per operation. But this is
+ // an implementation detail which is subject to change.
+
+ // Expect the 'RESERVE(reserved1)' as the first message.
+ // The checkpointed resources should correspond to 'reserved1'.
+ AWAIT_READY(message1);
+ EXPECT_EQ(Resources(message1.get().resources()), reserved1);
+
+ // Expect the 'RESERVE(reserved2)' as the second message.
+ // The checkpointed resources should correspond to
+ // 'reserved1 + reserved2'.
+ AWAIT_READY(message2);
+ EXPECT_EQ(Resources(message2.get().resources()), reserved1 + reserved2);
+
+ // Expect the 'UNRESERVE(reserved1)' as the third message.
+ // The checkpointed resources should correspond to 'reserved2'.
+ AWAIT_READY(message3);
+ EXPECT_EQ(Resources(message3.get().resources()), reserved2);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that the slave checkpoints the resources for
+// dynamic reservations to the disk, recovers them upon restart, and
+// sends them to the master during re-registration.
+TEST_F(ReservationTest, ResourcesCheckpointing)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+ masterFlags.roles = frameworkInfo.role();
+
+ Try<PID<Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.recover = "reconnect";
+ slaveFlags.resources = "cpus:8;mem:4096";
+
+ Try<PID<Slave>> slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+ Resources reserved = unreserved.flatten(
+ frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+ // We use this to capture offers from 'resourceOffers'.
+ Future<vector<Offer>> offers;
+
+ // The expectation for the offer.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ // Expect an offer with the unreserved resources.
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(offers.get().size(), 1u);
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get());
+
+ // We use the filter explicitly here so that the resources
+ // will not be filtered for 5 seconds (by default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // Reserve the resources.
+ driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+ // Expect to receive the 'CheckpointResourcesMessage'.
+ AWAIT_READY(checkpointResources);
+
+ // Restart the slave without shutting down.
+ Stop(slave.get(), false);
+
+ Future<ReregisterSlaveMessage> reregisterSlave =
+ FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+ Future<Nothing> slaveRecover = FUTURE_DISPATCH(_, &Slave::recover);
+
+ slave = StartSlave(slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Expect the slave to recover.
+ AWAIT_READY(slaveRecover);
+
+ // Expect to receive the 'ReregisterSlaveMessage' containing the
+ // reserved resources.
+ AWAIT_READY(reregisterSlave);
+
+ EXPECT_EQ(reregisterSlave.get().checkpointed_resources(), reserved);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies the case where a slave that has checkpointed
+// dynamic reservations reregisters with a failed over master, and the
+// dynamic reservations are later correctly offered to the framework.
+TEST_F(ReservationTest, MasterFailover)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+ masterFlags.roles = frameworkInfo.role();
+
+ Try<PID<Master>> master1 = StartMaster(masterFlags);
+ ASSERT_SOME(master1);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.resources = "cpus:8;mem:2048";
+
+ StandaloneMasterDetector detector(master1.get());
+
+ Try<PID<Slave>> slave = StartSlave(&detector, slaveFlags);
+ ASSERT_SOME(slave);
+
+ MockScheduler sched;
+ TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+ Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+ Resources reserved = unreserved.flatten(
+ frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+ // We use this to capture offers from 'resourceOffers'.
+ Future<vector<Offer>> offers;
+
+ // The expectation for the first offer.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ // In the first offer, expect an offer with unreserved resources.
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(offers.get().size(), 1u);
+ Offer offer = offers.get()[0];
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get());
+
+ // We use the filter explicitly here so that the resources
+ // will not be filtered for 5 seconds (by default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // Reserve the resources.
+ driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+ // Expect to receive the CheckpointResourcesMessage.
+ AWAIT_READY(checkpointResources);
+
+ // This is to make sure CheckpointResourcesMessage is processed.
+ process::Clock::pause();
+ process::Clock::settle();
+ process::Clock::resume();
+
+ EXPECT_CALL(sched, disconnected(&driver));
+
+ // Simulate master failover by restarting the master.
+ Stop(master1.get());
+
+ Try<PID<Master>> master2 = StartMaster(masterFlags);
+ ASSERT_SOME(master2);
+
+ Future<SlaveReregisteredMessage> slaveReregistered =
+ FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+ // The expectation for the next offer.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ // Simulate a new master detected event on the slave so that the
+ // slave will do a re-registration.
+ detector.appoint(master2.get());
+
+ // Wait for slave to confirm re-registration.
+ AWAIT_READY(slaveReregistered);
+
+ // In the next offer, expect an offer with the reserved resources.
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(offers.get().size(), 1u);
+ offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(reserved));
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// NOTE: The following tests covers the notion of compatible resources
+// on slave restart. Newly declared resources are compatible if they
+// include the checkpointed resources. For example, suppose a slave
+// initially declares "cpus:8;mem:4096", and "cpus:8;mem:2048" gets
+// reserved and thus checkpointed. In order to be compatible, the
+// newly declared resources must include "cpus:8;mem:2048". For
+// example, "cpus:12;mem:2048" would be considered compatible.
+
+
+// This test verifies that a slave can restart as long as the
+// checkpointed resources it recovers are compatible with the slave
+// resources specified using the '--resources' flag.
+TEST_F(ReservationTest, CompatibleCheckpointedResources)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+ masterFlags.roles = frameworkInfo.role();
+
+ Try<PID<Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.resources = "cpus:8;mem:4096";
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ StandaloneMasterDetector detector(master.get());
+
+ MockSlave slave1(slaveFlags, &detector, &containerizer);
+ spawn(slave1);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+ Resources reserved = unreserved.flatten(
+ frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+ // We use this to capture offers from 'resourceOffers'.
+ Future<vector<Offer>> offers;
+
+ // The expectation for the first offer.
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ // In the first offer, expect an offer with unreserved resources.
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(offers.get().size(), 1u);
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ // We use the filter explicitly here so that the resources
+ // will not be filtered for 5 seconds (by default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // Reserve the resources.
+ driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+ // Wait until CheckpointResourcesMessage arrives.
+ AWAIT_READY(checkpointResources);
+
+ terminate(slave1);
+ wait(slave1);
+
+ // Simulate a reboot of the slave machine by modify the boot ID.
+ ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+ slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+ "rebooted! ;)"));
+
+ // Change the slave resources so that it is compatible with the
+ // checkpointed resources.
+ slaveFlags.resources = "cpus:12;mem:2048";
+
+ MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+ Future<Future<Nothing>> recover;
+ EXPECT_CALL(slave2, __recover(_))
+ .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+ spawn(slave2);
+
+ // Wait for 'recover' to finish.
+ AWAIT_READY(recover);
+
+ // Expect 'recover' to have completed successfully.
+ AWAIT_READY(recover.get());
+
+ terminate(slave2);
+ wait(slave2);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that a slave can restart as long as the
+// checkpointed resources (including persistent volumes) it recovers
+// are compatible with the slave resources specified using the
+// '--resources' flag.
+TEST_F(ReservationTest, CompatibleCheckpointedResourcesWithPersistentVolumes)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+ masterFlags.roles = frameworkInfo.role();
+
+ Try<PID<Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.resources = "cpus:8;mem:4096";
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ StandaloneMasterDetector detector(master.get());
+
+ MockSlave slave1(slaveFlags, &detector, &containerizer);
+ spawn(slave1);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+ Resources reserved = unreserved.flatten(
+ frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+ Resource unreservedDisk = Resources::parse("disk", "1024", "*").get();
+ Resource reservedDisk = unreservedDisk;
+ reservedDisk.set_role(frameworkInfo.role());
+ reservedDisk.mutable_reservation()->CopyFrom(
+ createReservationInfo(frameworkInfo.principal()));
+
+ Resource volume = reservedDisk;
+ volume.mutable_disk()->CopyFrom(
+ createDiskInfo("persistence_id", "container_path"));
+
+ // We use this to capture offers from 'resourceOffers'.
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(offers.get().size(), 1u);
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(
+ Resources(offer.resources()).contains(unreserved + unreservedDisk));
+
+ Future<CheckpointResourcesMessage> message2 =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ Future<CheckpointResourcesMessage> message1 =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ // We use the filter explicitly here so that the resources
+ // will not be filtered for 5 seconds (by default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // Reserve the resources and create the volume.
+ driver.acceptOffers(
+ {offer.id()},
+ {RESERVE(reserved + reservedDisk), CREATE(volume)},
+ filters);
+
+ // NOTE: Currently, we send one message per operation. But this is
+ // an implementation detail which is subject to change.
+ AWAIT_READY(message1);
+ EXPECT_EQ(Resources(message1.get().resources()), reserved + reservedDisk);
+
+ AWAIT_READY(message2);
+ EXPECT_EQ(Resources(message2.get().resources()), reserved + volume);
+
+ terminate(slave1);
+ wait(slave1);
+
+ // Simulate a reboot of the slave machine by modify the boot ID.
+ ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+ slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+ "rebooted! ;)"));
+
+ // Change the slave resources so that it is compatible with the
+ // checkpointed resources.
+ slaveFlags.resources = "cpus:12;mem:2048;disk:1024";
+
+ MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+ Future<Future<Nothing>> recover;
+ EXPECT_CALL(slave2, __recover(_))
+ .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+ spawn(slave2);
+
+ // Wait for 'recover' to finish.
+ AWAIT_READY(recover);
+
+ // Expect that 'recover' will complete successfully.
+ AWAIT_READY(recover.get());
+
+ terminate(slave2);
+ wait(slave2);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
+
+// This test verifies that a slave will refuse to start if the
+// checkpointed resources it recovers are not compatible with the
+// slave resources specified using the '--resources' flag.
+TEST_F(ReservationTest, IncompatibleCheckpointedResources)
+{
+ FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_role("role");
+
+ master::Flags masterFlags = CreateMasterFlags();
+ masterFlags.allocation_interval = Milliseconds(50);
+ masterFlags.roles = frameworkInfo.role();
+
+ Try<PID<Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.resources = "cpus:8;mem:4096";
+
+ MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+ TestContainerizer containerizer(&exec);
+
+ StandaloneMasterDetector detector(master.get());
+
+ MockSlave slave1(slaveFlags, &detector, &containerizer);
+ spawn(slave1);
+
+ MockScheduler sched;
+ MesosSchedulerDriver driver(
+ &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+ Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+ Resources reserved = unreserved.flatten(
+ frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+ // We use this to capture offers from 'resourceOffers'.
+ Future<vector<Offer>> offers;
+
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return());
+
+ EXPECT_CALL(sched, registered(&driver, _, _));
+
+ driver.start();
+
+ // In the first offer, expect an offer with unreserved resources.
+ AWAIT_READY(offers);
+
+ ASSERT_EQ(offers.get().size(), 1u);
+ Offer offer = offers.get()[0];
+
+ EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+ Future<CheckpointResourcesMessage> checkpointResources =
+ FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+ // We use the filter explicitly here so that the resources
+ // will not be filtered for 5 seconds (by default).
+ Filters filters;
+ filters.set_refuse_seconds(0);
+
+ // Reserve the resources.
+ driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+ // Wait for CheckpointResourcesMessage to be delivered.
+ AWAIT_READY(checkpointResources);
+
+ terminate(slave1);
+ wait(slave1);
+
+ // Simulate a reboot of the slave machine by modify the boot ID.
+ ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+ slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+ "rebooted! ;)"));
+
+ // Change the slave resources so that it's not compatible with the
+ // checkpointed resources.
+ slaveFlags.resources = "cpus:4;mem:2048";
+
+ MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+ Future<Future<Nothing>> recover;
+ EXPECT_CALL(slave2, __recover(_))
+ .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+ spawn(slave2);
+
+ // Wait for 'recover' to finish.
+ AWAIT_READY(recover);
+
+ // Expect for 'recover' to have failed.
+ AWAIT_FAILED(recover.get());
+
+ terminate(slave2);
+ wait(slave2);
+
+ driver.stop();
+ driver.join();
+
+ Shutdown();
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
[2/2] mesos git commit: Fixed compiler warnings in reservation tests.
Posted by ji...@apache.org.
Fixed compiler warnings in reservation tests.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9db83273
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9db83273
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9db83273
Branch: refs/heads/master
Commit: 9db83273ea668c47206db3b0cb613afd7855abcb
Parents: 96e203f
Author: Jie Yu <yu...@gmail.com>
Authored: Wed May 13 11:02:38 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 11:02:38 2015 -0700
----------------------------------------------------------------------
src/tests/reservation_tests.cpp | 40 ++++++++++++++++++------------------
1 file changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9db83273/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index 9020d04..755a375 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -125,7 +125,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -140,7 +140,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -155,7 +155,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
// In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -217,7 +217,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -247,7 +247,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -267,7 +267,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
// In the next offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -336,7 +336,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -356,7 +356,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
// In the next offer, expect an offer with reserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -381,7 +381,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
// framework1.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -448,7 +448,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -476,7 +476,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
// resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -540,7 +540,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
// resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(staticallyReserved));
@@ -565,7 +565,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
// reserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(staticallyReserved));
@@ -627,7 +627,7 @@ TEST_F(ReservationTest, SendingCheckpointResourcesMessage)
// 'unreserved2'.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1u);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved1 + unreserved2));
@@ -723,7 +723,7 @@ TEST_F(ReservationTest, ResourcesCheckpointing)
// Expect an offer with the unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1u);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -814,7 +814,7 @@ TEST_F(ReservationTest, MasterFailover)
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1u);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
Future<CheckpointResourcesMessage> checkpointResources =
@@ -863,7 +863,7 @@ TEST_F(ReservationTest, MasterFailover)
// In the next offer, expect an offer with the reserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1u);
+ ASSERT_EQ(1u, offers.get().size());
offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(reserved));
@@ -933,7 +933,7 @@ TEST_F(ReservationTest, CompatibleCheckpointedResources)
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1u);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -1047,7 +1047,7 @@ TEST_F(ReservationTest, CompatibleCheckpointedResourcesWithPersistentVolumes)
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1u);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(
@@ -1163,7 +1163,7 @@ TEST_F(ReservationTest, IncompatibleCheckpointedResources)
// In the first offer, expect an offer with unreserved resources.
AWAIT_READY(offers);
- ASSERT_EQ(offers.get().size(), 1u);
+ ASSERT_EQ(1u, offers.get().size());
Offer offer = offers.get()[0];
EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));