You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2015/05/13 20:04:29 UTC

[1/2] mesos git commit: Persisted the reservation state on the slave.

Repository: mesos
Updated Branches:
  refs/heads/master 860bb0fc3 -> 9db83273e


Persisted the reservation state on the slave.

Review: https://reviews.apache.org/r/32398


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/96e203f7
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/96e203f7
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/96e203f7

Branch: refs/heads/master
Commit: 96e203f75b1529586768d512f7ce09d8034fa02e
Parents: 860bb0f
Author: Michael Park <mc...@gmail.com>
Authored: Wed May 13 10:43:06 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 10:54:22 2015 -0700

----------------------------------------------------------------------
 include/mesos/resources.hpp     |   3 +
 src/common/resources.cpp        |   6 +
 src/common/resources_utils.cpp  |  42 ++-
 src/tests/reservation_tests.cpp | 705 +++++++++++++++++++++++++++++++++--
 4 files changed, 709 insertions(+), 47 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/include/mesos/resources.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp
index 4c036d3..f44cb62 100644
--- a/include/mesos/resources.hpp
+++ b/include/mesos/resources.hpp
@@ -114,6 +114,9 @@ public:
   // Tests if the given Resource object is unreserved.
   static bool isUnreserved(const Resource& resource);
 
+  // Tests if the given Resource object is dynamically reserved.
+  static bool isDynamicallyReserved(const Resource& resource);
+
   // Returns the summed up Resources given a hashmap<Key, Resources>.
   //
   // NOTE: While scalar resources such as "cpus" sum correctly,

http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/src/common/resources.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources.cpp b/src/common/resources.cpp
index 235930f..843a06d 100644
--- a/src/common/resources.cpp
+++ b/src/common/resources.cpp
@@ -502,6 +502,12 @@ bool Resources::isUnreserved(const Resource& resource)
 }
 
 
+bool Resources::isDynamicallyReserved(const Resource& resource)
+{
+  return resource.has_reservation();
+}
+
+
 /////////////////////////////////////////////////
 // Public member functions.
 /////////////////////////////////////////////////

http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index fe04d57..8d6aaaa 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -26,11 +26,21 @@ namespace mesos {
 
 bool needCheckpointing(const Resource& resource)
 {
-  // TODO(mpark): Consider framework reservations.
-  return Resources::isPersistentVolume(resource);
+  return Resources::isDynamicallyReserved(resource) ||
+         Resources::isPersistentVolume(resource);
 }
 
 
+// NOTE: We effectively duplicate the logic in 'Resources::apply'
+// which is less than ideal. But we can not simply create
+// 'Offer::Operation' and invoke 'Resources::apply' here.
+// 'RESERVE' operation requires that the specified resources are
+// dynamically reserved only, and 'CREATE' requires that the
+// specified resources are already dynamically reserved.
+// These requirements are violated when we try to infer dynamically
+// reserved persistent volumes.
+// TODO(mpark): Consider introducing an atomic 'RESERVE_AND_CREATE'
+// operation to solve this problem.
 Try<Resources> applyCheckpointedResources(
     const Resources& resources,
     const Resources& checkpointedResources)
@@ -42,23 +52,25 @@ Try<Resources> applyCheckpointedResources(
       return Error("Unexpected checkpointed resources " + stringify(resource));
     }
 
-    // TODO(jieyu): Apply RESERVE operation if 'resource' is
-    // dynamically reserved.
+    Resource stripped = resource;
 
-    if (Resources::isPersistentVolume(resource)) {
-      Offer::Operation create;
-      create.set_type(Offer::Operation::CREATE);
-      create.mutable_create()->add_volumes()->CopyFrom(resource);
+    if (Resources::isDynamicallyReserved(resource)) {
+      stripped.set_role("*");
+      stripped.clear_reservation();
+    }
 
-      Try<Resources> applied = totalResources.apply(create);
-      if (applied.isError()) {
-        return Error(
-            "Cannot find transition for checkpointed resource " +
-            stringify(resource));
-      }
+    if (Resources::isPersistentVolume(resource)) {
+      stripped.clear_disk();
+    }
 
-      totalResources = applied.get();
+    if (!totalResources.contains(stripped)) {
+      return Error(
+          "Incompatible slave resources: " + stringify(totalResources) +
+          " does not contain " + stringify(stripped));
     }
+
+    totalResources -= stripped;
+    totalResources += resource;
   }
 
   return totalResources;

http://git-wip-us.apache.org/repos/asf/mesos/blob/96e203f7/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index c0edf3c..9020d04 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -102,8 +102,8 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   MesosSchedulerDriver driver(
       &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
 
-  // We use the filter explicitly here so that the resources
-  // will not be filtered for 5 seconds (by default).
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (by default).
   Filters filters;
   filters.set_refuse_seconds(0);
 
@@ -122,7 +122,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
 
   driver.start();
 
-  // In the first offer, expect an offer with the unreserved resources.
+  // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -137,7 +137,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   // Reserve the resources.
   driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
 
-  // In the next offer, expect an offer with the reserved resources.
+  // In the next offer, expect an offer with reserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -152,7 +152,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   // Unreserve the resources.
   driver.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
 
-  // In the next offer, expect an offer with the unreserved resources.
+  // In the next offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -167,10 +167,10 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
 }
 
 
-// This tests that a framework can send back a Reserve followed by
-// a LaunchTasks offer operation as a response to an offer, which
-// updates the resources in the allocator then proceeds to launch
-// the task with the reserved resources. The reserved resources are
+// This tests that a framework can send back a Reserve followed by a
+// LaunchTasks offer operation as a response to an offer, which
+// updates the resources in the allocator then proceeds to launch the
+// task with the reserved resources. The reserved resources are
 // reoffered to the framework on task completion. The framework then
 // sends back an Unreserved offer operation to unreserve the reserved
 // resources. We test that the framework receives the unreserved
@@ -214,7 +214,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
 
   driver.start();
 
-  // In the first offer, expect an offer with the unreserved resources.
+  // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -244,7 +244,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
       {RESERVE(dynamicallyReserved),
       LAUNCH({taskInfo})});
 
-  // In the next offer, expect an offer with the reserved resources.
+  // In the next offer, expect an offer with reserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -256,15 +256,15 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
   EXPECT_CALL(sched, resourceOffers(&driver, _))
     .WillOnce(FutureArg<1>(&offers));
 
-  // We use the filter explicitly here so that the resources
-  // will not be filtered for 5 seconds (by default).
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (by default).
   Filters filters;
   filters.set_refuse_seconds(0);
 
   // Unreserve the resources.
   driver.acceptOffers({offer.id()}, {UNRESERVE(dynamicallyReserved)}, filters);
 
-  // In the next offer, expect an offer with the unreserved resources.
+  // In the next offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -280,11 +280,11 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
 
 
 // This test launches 2 frameworks in the same role. framework1
-// reserves resources by sending back a Reserve offer operation.
-// We first test that framework1 receives the reserved resources,
-// then on the next resource offer, framework1 declines the offer.
-// This should lead to framework2 receiving the resources that
-// framework1 reserved.
+// reserves resources by sending back a Reserve offer operation. We
+// first test that framework1 receives the reserved resources, then on
+// the next resource offer, framework1 declines the offer. This
+// should lead to framework2 receiving the resources that framework1
+// reserved.
 TEST_F(ReservationTest, ReserveShareWithinRole)
 {
   string role = "role";
@@ -333,7 +333,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
 
   driver1.start();
 
-  // In the first offer, expect an offer with the unreserved resources.
+  // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -345,15 +345,15 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
   EXPECT_CALL(sched1, resourceOffers(&driver1, _))
     .WillOnce(FutureArg<1>(&offers));
 
-  // We use the filter explicitly here so that the resources
-  // will not be filtered for 5 seconds (by default).
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (by default).
   Filters filters;
   filters.set_refuse_seconds(0);
 
   // Reserve the resources.
   driver1.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
 
-  // In the next offer, expect an offer with the reserved resources.
+  // In the next offer, expect an offer with reserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -445,7 +445,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
 
   driver.start();
 
-  // In the first offer, expect an offer with the unreserved resources.
+  // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -461,8 +461,8 @@ TEST_F(ReservationTest, DropReserveTooLarge)
   EXPECT_CALL(allocator, updateAllocation(_, _, _))
     .Times(0);
 
-  // We use the filter explicitly here so that the resources
-  // will not be filtered for 5 seconds (by default).
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (by default).
   Filters filters;
   filters.set_refuse_seconds(0);
 
@@ -488,8 +488,8 @@ TEST_F(ReservationTest, DropReserveTooLarge)
 }
 
 
-// This tests that an attempt to dynamically reserve statically reserved
-// resources is dropped.
+// This tests that an attempt to dynamically reserve statically
+// reserved resources is dropped.
 TEST_F(ReservationTest, DropReserveStaticReservation)
 {
   TestAllocator<> allocator;
@@ -536,8 +536,8 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
 
   driver.start();
 
-  // In the first offer, expect an offer with the statically
-  // reserved resources.
+  // In the first offer, expect an offer with the statically reserved
+  // resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -561,8 +561,8 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   // Attempt to reserve the statically reserved resources.
   driver.acceptOffers({offer.id()}, {RESERVE(dynamicallyReserved)}, filters);
 
-  // In the next offer, still expect an offer with the statically reserved
-  // resources.
+  // In the next offer, still expect an offer with the statically
+  // reserved resources.
   AWAIT_READY(offers);
 
   ASSERT_EQ(offers.get().size(), 1);
@@ -576,6 +576,647 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   Shutdown();
 }
 
+
+// This test verifies that CheckpointResourcesMessages are sent to the
+// slave when a framework reserve/unreserves resources, and the
+// resources in the messages correctly reflect the resources that need
+// to be checkpointed on the slave.
+TEST_F(ReservationTest, SendingCheckpointResourcesMessage)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+  masterFlags.roles = frameworkInfo.role();
+
+  Try<PID<Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:8;mem:4096";
+
+  Try<PID<Slave>> slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Resources unreserved1 = Resources::parse("cpus:8").get();
+  Resources reserved1 = unreserved1.flatten(
+      frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+  Resources unreserved2 = Resources::parse("mem:2048").get();
+  Resources reserved2 = unreserved2.flatten(
+      frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+  // We use this to capture offers from 'resourceOffers'.
+  Future<vector<Offer>> offers;
+
+  // The expectation for the first offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  driver.start();
+
+  // In the first offer, expect the sum of 'unreserved1' and
+  // 'unreserved2'.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(offers.get().size(), 1u);
+  Offer offer = offers.get()[0];
+
+  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved1 + unreserved2));
+
+  Future<CheckpointResourcesMessage> message3 =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  Future<CheckpointResourcesMessage> message2 =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  Future<CheckpointResourcesMessage> message1 =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  // We use the filter explicitly here so that the resources will not
+  // be filtered for 5 seconds (by default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Attempt to reserve and unreserve resources.
+  driver.acceptOffers(
+      {offer.id()},
+      {RESERVE(reserved1), RESERVE(reserved2), UNRESERVE(reserved1)},
+      filters);
+
+  // NOTE: Currently, we send one message per operation. But this is
+  // an implementation detail which is subject to change.
+
+  // Expect the 'RESERVE(reserved1)' as the first message.
+  // The checkpointed resources should correspond to 'reserved1'.
+  AWAIT_READY(message1);
+  EXPECT_EQ(Resources(message1.get().resources()), reserved1);
+
+  // Expect the 'RESERVE(reserved2)' as the second message.
+  // The checkpointed resources should correspond to
+  // 'reserved1 + reserved2'.
+  AWAIT_READY(message2);
+  EXPECT_EQ(Resources(message2.get().resources()), reserved1 + reserved2);
+
+  // Expect the 'UNRESERVE(reserved1)' as the third message.
+  // The checkpointed resources should correspond to 'reserved2'.
+  AWAIT_READY(message3);
+  EXPECT_EQ(Resources(message3.get().resources()), reserved2);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that the slave checkpoints the resources for
+// dynamic reservations to the disk, recovers them upon restart, and
+// sends them to the master during re-registration.
+TEST_F(ReservationTest, ResourcesCheckpointing)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+  masterFlags.roles = frameworkInfo.role();
+
+  Try<PID<Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.recover = "reconnect";
+  slaveFlags.resources = "cpus:8;mem:4096";
+
+  Try<PID<Slave>> slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+  Resources reserved = unreserved.flatten(
+      frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+  // We use this to capture offers from 'resourceOffers'.
+  Future<vector<Offer>> offers;
+
+  // The expectation for the offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  driver.start();
+
+  // Expect an offer with the unreserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(offers.get().size(), 1u);
+  Offer offer = offers.get()[0];
+
+  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+  Future<CheckpointResourcesMessage> checkpointResources =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get());
+
+  // We use the filter explicitly here so that the resources
+  // will not be filtered for 5 seconds (by default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Reserve the resources.
+  driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+  // Expect to receive the 'CheckpointResourcesMessage'.
+  AWAIT_READY(checkpointResources);
+
+  // Restart the slave without shutting down.
+  Stop(slave.get(), false);
+
+  Future<ReregisterSlaveMessage> reregisterSlave =
+    FUTURE_PROTOBUF(ReregisterSlaveMessage(), _, _);
+
+  Future<Nothing> slaveRecover = FUTURE_DISPATCH(_, &Slave::recover);
+
+  slave = StartSlave(slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Expect the slave to recover.
+  AWAIT_READY(slaveRecover);
+
+  // Expect to receive the 'ReregisterSlaveMessage' containing the
+  // reserved resources.
+  AWAIT_READY(reregisterSlave);
+
+  EXPECT_EQ(reregisterSlave.get().checkpointed_resources(), reserved);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies the case where a slave that has checkpointed
+// dynamic reservations reregisters with a failed over master, and the
+// dynamic reservations are later correctly offered to the framework.
+TEST_F(ReservationTest, MasterFailover)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+  masterFlags.roles = frameworkInfo.role();
+
+  Try<PID<Master>> master1 = StartMaster(masterFlags);
+  ASSERT_SOME(master1);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:8;mem:2048";
+
+  StandaloneMasterDetector detector(master1.get());
+
+  Try<PID<Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  MockScheduler sched;
+  TestingMesosSchedulerDriver driver(&sched, &detector, frameworkInfo);
+
+  Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+  Resources reserved = unreserved.flatten(
+      frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+  // We use this to capture offers from 'resourceOffers'.
+  Future<vector<Offer>> offers;
+
+  // The expectation for the first offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  driver.start();
+
+  // In the first offer, expect an offer with unreserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(offers.get().size(), 1u);
+  Offer offer = offers.get()[0];
+
+  Future<CheckpointResourcesMessage> checkpointResources =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, slave.get());
+
+  // We use the filter explicitly here so that the resources
+  // will not be filtered for 5 seconds (by default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Reserve the resources.
+  driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+  // Expect to receive the CheckpointResourcesMessage.
+  AWAIT_READY(checkpointResources);
+
+  // This is to make sure CheckpointResourcesMessage is processed.
+  process::Clock::pause();
+  process::Clock::settle();
+  process::Clock::resume();
+
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  // Simulate master failover by restarting the master.
+  Stop(master1.get());
+
+  Try<PID<Master>> master2 = StartMaster(masterFlags);
+  ASSERT_SOME(master2);
+
+  Future<SlaveReregisteredMessage> slaveReregistered =
+    FUTURE_PROTOBUF(SlaveReregisteredMessage(), _, _);
+
+  // The expectation for the next offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  // Simulate a new master detected event on the slave so that the
+  // slave will do a re-registration.
+  detector.appoint(master2.get());
+
+  // Wait for slave to confirm re-registration.
+  AWAIT_READY(slaveReregistered);
+
+  // In the next offer, expect an offer with the reserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(offers.get().size(), 1u);
+  offer = offers.get()[0];
+
+  EXPECT_TRUE(Resources(offer.resources()).contains(reserved));
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// NOTE: The following tests covers the notion of compatible resources
+// on slave restart. Newly declared resources are compatible if they
+// include the checkpointed resources. For example, suppose a slave
+// initially declares "cpus:8;mem:4096", and "cpus:8;mem:2048" gets
+// reserved and thus checkpointed. In order to be compatible, the
+// newly declared resources must include "cpus:8;mem:2048".  For
+// example, "cpus:12;mem:2048" would be considered compatible.
+
+
+// This test verifies that a slave can restart as long as the
+// checkpointed resources it recovers are compatible with the slave
+// resources specified using the '--resources' flag.
+TEST_F(ReservationTest, CompatibleCheckpointedResources)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+  masterFlags.roles = frameworkInfo.role();
+
+  Try<PID<Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:8;mem:4096";
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  StandaloneMasterDetector detector(master.get());
+
+  MockSlave slave1(slaveFlags, &detector, &containerizer);
+  spawn(slave1);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+  Resources reserved = unreserved.flatten(
+      frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+  // We use this to capture offers from 'resourceOffers'.
+  Future<vector<Offer>> offers;
+
+  // The expectation for the first offer.
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers));
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  driver.start();
+
+  // In the first offer, expect an offer with unreserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(offers.get().size(), 1u);
+  Offer offer = offers.get()[0];
+
+  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+  Future<CheckpointResourcesMessage> checkpointResources =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  // We use the filter explicitly here so that the resources
+  // will not be filtered for 5 seconds (by default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Reserve the resources.
+  driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+  // Wait until CheckpointResourcesMessage arrives.
+  AWAIT_READY(checkpointResources);
+
+  terminate(slave1);
+  wait(slave1);
+
+  // Simulate a reboot of the slave machine by modify the boot ID.
+  ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+      slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+      "rebooted! ;)"));
+
+  // Change the slave resources so that it is compatible with the
+  // checkpointed resources.
+  slaveFlags.resources = "cpus:12;mem:2048";
+
+  MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+  Future<Future<Nothing>> recover;
+  EXPECT_CALL(slave2, __recover(_))
+    .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+  spawn(slave2);
+
+  // Wait for 'recover' to finish.
+  AWAIT_READY(recover);
+
+  // Expect 'recover' to have completed successfully.
+  AWAIT_READY(recover.get());
+
+  terminate(slave2);
+  wait(slave2);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that a slave can restart as long as the
+// checkpointed resources (including persistent volumes) it recovers
+// are compatible with the slave resources specified using the
+// '--resources' flag.
+TEST_F(ReservationTest, CompatibleCheckpointedResourcesWithPersistentVolumes)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+  masterFlags.roles = frameworkInfo.role();
+
+  Try<PID<Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:8;mem:4096";
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  StandaloneMasterDetector detector(master.get());
+
+  MockSlave slave1(slaveFlags, &detector, &containerizer);
+  spawn(slave1);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+  Resources reserved = unreserved.flatten(
+      frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+  Resource unreservedDisk = Resources::parse("disk", "1024", "*").get();
+  Resource reservedDisk = unreservedDisk;
+  reservedDisk.set_role(frameworkInfo.role());
+  reservedDisk.mutable_reservation()->CopyFrom(
+      createReservationInfo(frameworkInfo.principal()));
+
+  Resource volume = reservedDisk;
+  volume.mutable_disk()->CopyFrom(
+      createDiskInfo("persistence_id", "container_path"));
+
+  // We use this to capture offers from 'resourceOffers'.
+  Future<vector<Offer>> offers;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  driver.start();
+
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(offers.get().size(), 1u);
+  Offer offer = offers.get()[0];
+
+  EXPECT_TRUE(
+      Resources(offer.resources()).contains(unreserved + unreservedDisk));
+
+  Future<CheckpointResourcesMessage> message2 =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  Future<CheckpointResourcesMessage> message1 =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  // We use the filter explicitly here so that the resources
+  // will not be filtered for 5 seconds (by default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Reserve the resources and create the volume.
+  driver.acceptOffers(
+      {offer.id()},
+      {RESERVE(reserved + reservedDisk), CREATE(volume)},
+      filters);
+
+  // NOTE: Currently, we send one message per operation. But this is
+  // an implementation detail which is subject to change.
+  AWAIT_READY(message1);
+  EXPECT_EQ(Resources(message1.get().resources()), reserved + reservedDisk);
+
+  AWAIT_READY(message2);
+  EXPECT_EQ(Resources(message2.get().resources()), reserved + volume);
+
+  terminate(slave1);
+  wait(slave1);
+
+  // Simulate a reboot of the slave machine by modify the boot ID.
+  ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+      slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+      "rebooted! ;)"));
+
+  // Change the slave resources so that it is compatible with the
+  // checkpointed resources.
+  slaveFlags.resources = "cpus:12;mem:2048;disk:1024";
+
+  MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+  Future<Future<Nothing>> recover;
+  EXPECT_CALL(slave2, __recover(_))
+    .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+  spawn(slave2);
+
+  // Wait for 'recover' to finish.
+  AWAIT_READY(recover);
+
+  // Expect that 'recover' will complete successfully.
+  AWAIT_READY(recover.get());
+
+  terminate(slave2);
+  wait(slave2);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
+
+// This test verifies that a slave will refuse to start if the
+// checkpointed resources it recovers are not compatible with the
+// slave resources specified using the '--resources' flag.
+TEST_F(ReservationTest, IncompatibleCheckpointedResources)
+{
+  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("role");
+
+  master::Flags masterFlags = CreateMasterFlags();
+  masterFlags.allocation_interval = Milliseconds(50);
+  masterFlags.roles = frameworkInfo.role();
+
+  Try<PID<Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.resources = "cpus:8;mem:4096";
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  TestContainerizer containerizer(&exec);
+
+  StandaloneMasterDetector detector(master.get());
+
+  MockSlave slave1(slaveFlags, &detector, &containerizer);
+  spawn(slave1);
+
+  MockScheduler sched;
+  MesosSchedulerDriver driver(
+      &sched, frameworkInfo, master.get(), DEFAULT_CREDENTIAL);
+
+  Resources unreserved = Resources::parse("cpus:8;mem:2048").get();
+  Resources reserved = unreserved.flatten(
+      frameworkInfo.role(), createReservationInfo(frameworkInfo.principal()));
+
+  // We use this to capture offers from 'resourceOffers'.
+  Future<vector<Offer>> offers;
+
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return());
+
+  EXPECT_CALL(sched, registered(&driver, _, _));
+
+  driver.start();
+
+  // In the first offer, expect an offer with unreserved resources.
+  AWAIT_READY(offers);
+
+  ASSERT_EQ(offers.get().size(), 1u);
+  Offer offer = offers.get()[0];
+
+  EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
+
+  Future<CheckpointResourcesMessage> checkpointResources =
+    FUTURE_PROTOBUF(CheckpointResourcesMessage(), _, _);
+
+  // We use the filter explicitly here so that the resources
+  // will not be filtered for 5 seconds (by default).
+  Filters filters;
+  filters.set_refuse_seconds(0);
+
+  // Reserve the resources.
+  driver.acceptOffers({offer.id()}, {RESERVE(reserved)}, filters);
+
+  // Wait for CheckpointResourcesMessage to be delivered.
+  AWAIT_READY(checkpointResources);
+
+  terminate(slave1);
+  wait(slave1);
+
+  // Simulate a reboot of the slave machine by modify the boot ID.
+  ASSERT_SOME(os::write(slave::paths::getBootIdPath(
+      slave::paths::getMetaRootDir(slaveFlags.work_dir)),
+      "rebooted! ;)"));
+
+  // Change the slave resources so that it's not compatible with the
+  // checkpointed resources.
+  slaveFlags.resources = "cpus:4;mem:2048";
+
+  MockSlave slave2(slaveFlags, &detector, &containerizer);
+
+  Future<Future<Nothing>> recover;
+  EXPECT_CALL(slave2, __recover(_))
+    .WillOnce(DoAll(FutureArg<0>(&recover), Return()));
+
+  spawn(slave2);
+
+  // Wait for 'recover' to finish.
+  AWAIT_READY(recover);
+
+  // Expect for 'recover' to have failed.
+  AWAIT_FAILED(recover.get());
+
+  terminate(slave2);
+  wait(slave2);
+
+  driver.stop();
+  driver.join();
+
+  Shutdown();
+}
+
 }  // namespace tests {
 }  // namespace internal {
 }  // namespace mesos {


[2/2] mesos git commit: Fixed compiler warnings in reservation tests.

Posted by ji...@apache.org.
Fixed compiler warnings in reservation tests.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9db83273
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9db83273
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9db83273

Branch: refs/heads/master
Commit: 9db83273ea668c47206db3b0cb613afd7855abcb
Parents: 96e203f
Author: Jie Yu <yu...@gmail.com>
Authored: Wed May 13 11:02:38 2015 -0700
Committer: Jie Yu <yu...@gmail.com>
Committed: Wed May 13 11:02:38 2015 -0700

----------------------------------------------------------------------
 src/tests/reservation_tests.cpp | 40 ++++++++++++++++++------------------
 1 file changed, 20 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9db83273/src/tests/reservation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/reservation_tests.cpp b/src/tests/reservation_tests.cpp
index 9020d04..755a375 100644
--- a/src/tests/reservation_tests.cpp
+++ b/src/tests/reservation_tests.cpp
@@ -125,7 +125,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -140,7 +140,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   // In the next offer, expect an offer with reserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -155,7 +155,7 @@ TEST_F(ReservationTest, ReserveThenUnreserve)
   // In the next offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -217,7 +217,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
   // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -247,7 +247,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
   // In the next offer, expect an offer with reserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -267,7 +267,7 @@ TEST_F(ReservationTest, ReserveAndLaunchThenUnreserve)
   // In the next offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -336,7 +336,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
   // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -356,7 +356,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
   // In the next offer, expect an offer with reserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -381,7 +381,7 @@ TEST_F(ReservationTest, ReserveShareWithinRole)
   // framework1.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(dynamicallyReserved));
@@ -448,7 +448,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
   // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -476,7 +476,7 @@ TEST_F(ReservationTest, DropReserveTooLarge)
   // resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -540,7 +540,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   // resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(staticallyReserved));
@@ -565,7 +565,7 @@ TEST_F(ReservationTest, DropReserveStaticReservation)
   // reserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(staticallyReserved));
@@ -627,7 +627,7 @@ TEST_F(ReservationTest, SendingCheckpointResourcesMessage)
   // 'unreserved2'.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1u);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved1 + unreserved2));
@@ -723,7 +723,7 @@ TEST_F(ReservationTest, ResourcesCheckpointing)
   // Expect an offer with the unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1u);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -814,7 +814,7 @@ TEST_F(ReservationTest, MasterFailover)
   // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1u);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   Future<CheckpointResourcesMessage> checkpointResources =
@@ -863,7 +863,7 @@ TEST_F(ReservationTest, MasterFailover)
   // In the next offer, expect an offer with the reserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1u);
+  ASSERT_EQ(1u, offers.get().size());
   offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(reserved));
@@ -933,7 +933,7 @@ TEST_F(ReservationTest, CompatibleCheckpointedResources)
   // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1u);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));
@@ -1047,7 +1047,7 @@ TEST_F(ReservationTest, CompatibleCheckpointedResourcesWithPersistentVolumes)
 
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1u);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(
@@ -1163,7 +1163,7 @@ TEST_F(ReservationTest, IncompatibleCheckpointedResources)
   // In the first offer, expect an offer with unreserved resources.
   AWAIT_READY(offers);
 
-  ASSERT_EQ(offers.get().size(), 1u);
+  ASSERT_EQ(1u, offers.get().size());
   Offer offer = offers.get()[0];
 
   EXPECT_TRUE(Resources(offer.resources()).contains(unreserved));