You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/20 20:37:38 UTC

[4/5] mesos git commit: Maintenance Primitives: Added test for inverse offer filters.

Maintenance Primitives: Added test for inverse offer filters.

Checks that filters change which inverse offer is sent when.

Review: https://reviews.apache.org/r/38475


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31defd41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31defd41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31defd41

Branch: refs/heads/master
Commit: 31defd41bae4670247aa5b92f799234bcad9200b
Parents: 9d03297
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Sat Sep 19 14:24:44 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:22 2015 -0400

----------------------------------------------------------------------
 src/tests/master_maintenance_tests.cpp | 325 +++++++++++++++++++++++++++-
 1 file changed, 320 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/31defd41/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 9892bc3..c5277a1 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -51,6 +51,7 @@
 
 #include "slave/flags.hpp"
 
+#include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
@@ -83,6 +84,7 @@ using std::vector;
 
 using testing::AtMost;
 using testing::DoAll;
+using testing::Not;
 
 namespace mesos {
 namespace internal {
@@ -1167,11 +1169,6 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
     mesos.send(call);
   }
 
-  // TODO(hartem): The filters in this test do not actually
-  // do anything, because inverse offer filters have not been
-  // implemented yet.  Instead, the accept/decline calls use
-  // the default time, which results in a slow test.
-
   {
     // Decline an inverse offer, with a filter.
     Call call;
@@ -1227,6 +1224,324 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
 
+
+// Test ensures that inverse offers support filters.
+TEST_F(MasterMaintenanceTest, InverseOffersFilters)
+{
+  // Set up a master.
+  // NOTE: We don't use `StartMaster()` because we need to access these flags.
+  master::Flags flags = CreateMasterFlags();
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  ExecutorInfo executor1 = CREATE_EXECUTOR_INFO("executor-1", "exit 1");
+  ExecutorInfo executor2 = CREATE_EXECUTOR_INFO("executor-2", "exit 2");
+
+  MockExecutor exec1(executor1.executor_id());
+  MockExecutor exec2(executor2.executor_id());
+
+  hashmap<ExecutorID, Executor*> execs;
+  execs[executor1.executor_id()] = &exec1;
+  execs[executor2.executor_id()] = &exec2;
+
+  TestContainerizer containerizer(execs);
+
+  EXPECT_CALL(exec1, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec1, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec2, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec2, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Capture the registration message for the first slave.
+  Future<SlaveRegisteredMessage> slave1RegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+  // We need two agents for this test.
+  Try<PID<Slave>> slave1 = StartSlave(&containerizer);
+  ASSERT_SOME(slave1);
+
+  // We need to make sure the first slave registers before we schedule the
+  // machine it is running on for maintenance.
+  AWAIT_READY(slave1RegisteredMessage);
+
+  // Capture the registration message for the second slave.
+  Future<SlaveRegisteredMessage> slave2RegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), Not(slave1.get()));
+
+  slave::Flags slaveFlags2 = MesosTest::CreateSlaveFlags();
+  slaveFlags2.hostname = maintenanceHostname + "-2";
+
+  Try<PID<Slave>> slave2 = StartSlave(&containerizer, slaveFlags2);
+  ASSERT_SOME(slave2);
+
+  // We need to make sure the second slave registers before we schedule the
+  // machine it is running on for maintenance.
+  AWAIT_READY(slave2RegisteredMessage);
+
+  // Before starting any frameworks, put the first machine into `DRAINING` mode.
+  MachineID machine1;
+  machine1.set_hostname(maintenanceHostname);
+  machine1.set_ip(stringify(slave1.get().address.ip));
+
+  MachineID machine2;
+  machine2.set_hostname(slaveFlags2.hostname.get());
+  machine2.set_ip(stringify(slave2.get().address.ip));
+
+  const Time start = Clock::now() + Seconds(60);
+  const Duration duration = Seconds(120);
+  const Unavailability unavailability = createUnavailability(start, duration);
+
+  maintenance::Schedule schedule = createSchedule(
+      {createWindow({machine1, machine2}, unavailability)});
+
+  Future<Response> response = process::http::post(
+      master.get(),
+      "maintenance/schedule",
+      headers,
+      stringify(JSON::Protobuf(schedule)));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Pause the clock before starting a framework.
+  // This ensures deterministic offer-ing behavior during the test.
+  Clock::pause();
+
+  // Now start a framework.
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  Mesos mesos(
+      master.get(),
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+  v1::FrameworkID id(event.get().subscribed().framework_id());
+
+  // Trigger a batch allocation.
+  Clock::advance(flags.allocation_interval);
+  Clock::settle();
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(2, event.get().offers().offers().size());
+  EXPECT_EQ(0, event.get().offers().inverse_offers().size());
+
+  // All the offers should have unavailability.
+  foreach (const v1::Offer& offer, event.get().offers().offers()) {
+    EXPECT_TRUE(offer.has_unavailability());
+  }
+
+  // Save both offers.
+  v1::Offer offer1 = event.get().offers().offers(0);
+  v1::Offer offer2 = event.get().offers().offers(1);
+
+  // Spawn dummy tasks using both offers.
+  v1::TaskInfo taskInfo1 =
+    evolve(createTask(devolve(offer1), "exit 1", executor1.executor_id()));
+
+  v1::TaskInfo taskInfo2 =
+    evolve(createTask(devolve(offer2), "exit 2", executor2.executor_id()));
+
+    sleep(2);
+
+  {
+    // Accept the first offer.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer1.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo1);
+
+    mesos.send(call);
+  }
+
+  {
+    // Accept the second offer.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer2.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo2);
+
+    mesos.send(call);
+  }
+
+  // The order of events is deterministic from here on.
+  Clock::resume();
+
+  // Expect two inverse offers.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(2, event.get().offers().inverse_offers().size());
+
+  // Save these inverse offers.
+  v1::InverseOffer inverseOffer1 = event.get().offers().inverse_offers(0);
+  v1::InverseOffer inverseOffer2 = event.get().offers().inverse_offers(1);
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+
+  {
+    // Acknowledge TASK_RUNNING update for one task.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo1.task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
+    acknowledge->set_uuid(event.get().update().status().uuid());
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+
+  {
+    // Acknowledge TASK_RUNNING update for the other task.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo2.task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer2.agent_id());
+    acknowledge->set_uuid(event.get().update().status().uuid());
+
+    mesos.send(call);
+  }
+
+  {
+    // Decline the second inverse offer, with a filter set such that we
+    // should not see this inverse offer in the next allocation.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::DECLINE);
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(inverseOffer2.id());
+
+    v1::Filters filters;
+    filters.set_refuse_seconds(flags.allocation_interval.secs() + 1);
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  {
+    // Accept the first inverse offer, with a filter set such that we
+    // should immediately see this inverse offer again.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(inverseOffer1.id());
+
+    v1::Filters filters;
+    filters.set_refuse_seconds(0);
+    accept->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // Expect one inverse offer.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(
+      inverseOffer1.agent_id(),
+      event.get().offers().inverse_offers(0).agent_id());
+
+  inverseOffer1 = event.get().offers().inverse_offers(0);
+
+  {
+    // Do another immediate filter, but decline it this time.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::DECLINE);
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(inverseOffer1.id());
+
+    v1::Filters filters;
+    filters.set_refuse_seconds(0);
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // Expect the same inverse offer.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(
+      inverseOffer1.agent_id(),
+      event.get().offers().inverse_offers(0).agent_id());
+
+  EXPECT_CALL(exec1, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(exec2, shutdown(_))
+    .Times(AtMost(1));
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {