You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/18 22:32:52 UTC

mesos git commit: Maintenance Primitives: Fixed error in Accept/Decline inverse offers.

Repository: mesos
Updated Branches:
  refs/heads/master cc0a84790 -> e999beb18


Maintenance Primitives: Fixed error in Accept/Decline inverse offers.

Added regression test. Note that the test may be slow until inverse
offers filters are actually implemented.

Review: https://reviews.apache.org/r/38470


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e999beb1
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e999beb1
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e999beb1

Branch: refs/heads/master
Commit: e999beb18270601bebbb9131f87a0d1ed6fab37a
Parents: cc0a847
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Fri Sep 18 15:39:04 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Fri Sep 18 16:31:45 2015 -0400

----------------------------------------------------------------------
 src/master/master.cpp                  |   8 +-
 src/tests/master_maintenance_tests.cpp | 217 +++++++++++++++++++++++++++-
 2 files changed, 216 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e999beb1/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 0b61f11..ca4d587 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2821,8 +2821,8 @@ void Master::accept(
         status.set_status(mesos::master::InverseOfferStatus::ACCEPT);
 
         allocator->updateInverseOffer(
-            offer->slave_id(),
-            offer->framework_id(),
+            inverseOffer->slave_id(),
+            inverseOffer->framework_id(),
             status);
 
         removeInverseOffer(inverseOffer);
@@ -3292,8 +3292,8 @@ void Master::decline(
       status.set_status(mesos::master::InverseOfferStatus::DECLINE);
 
       allocator->updateInverseOffer(
-          offer->slave_id(),
-          offer->framework_id(),
+          inverseOffer->slave_id(),
+          inverseOffer->framework_id(),
           status);
 
       removeInverseOffer(inverseOffer);

http://git-wip-us.apache.org/repos/asf/mesos/blob/e999beb1/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 45112f1..2a41e7e 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -122,6 +122,13 @@ public:
     unavailability = createUnavailability(Clock::now());
   }
 
+  virtual master::Flags CreateMasterFlags()
+  {
+    master::Flags masterFlags = MesosTest::CreateMasterFlags();
+    masterFlags.authenticate_frameworks = false;
+    return masterFlags;
+  }
+
   virtual slave::Flags CreateSlaveFlags()
   {
     slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
@@ -366,10 +373,7 @@ TEST_F(MasterMaintenanceTest, FailToUnscheduleDeactivatedMachines)
 // slave is scheduled to go down for maintenance.
 TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
 {
-  master::Flags flags = CreateMasterFlags();
-  flags.authenticate_frameworks = false;
-
-  Try<PID<Master>> master = StartMaster(flags);
+  Try<PID<Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   MockExecutor exec(DEFAULT_EXECUTOR_ID);
@@ -425,7 +429,7 @@ TEST_F(MasterMaintenanceTest, PendingUnavailabilityTest)
 
   // Schedule this slave for maintenance.
   MachineID machine;
-  machine.set_hostname("maintenance-host");
+  machine.set_hostname(maintenanceHostname);
   machine.set_ip(stringify(slave.get().address.ip));
 
   // TODO(jmlvanre): Replace Time(0.0) with `Clock::now()` once JSON double
@@ -1028,6 +1032,209 @@ TEST_F(MasterMaintenanceTest, MachineStatus)
   ASSERT_EQ("0.0.0.2", statuses.get().draining_machines(0).ip());
 }
 
+
+// Test ensures that accept and decline works with inverse offers.
+TEST_F(MasterMaintenanceTest, InverseOffers)
+{
+  // Set up a master.
+  Try<PID<Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  MockExecutor exec(DEFAULT_EXECUTOR_ID);
+
+  Try<PID<Slave>> slave = StartSlave(&exec);
+  ASSERT_SOME(slave);
+
+  // Before starting any frameworks, put the one machine into `DRAINING` mode.
+  MachineID machine;
+  machine.set_hostname(maintenanceHostname);
+  machine.set_ip(stringify(slave.get().address.ip));
+
+  // TODO(josephw): Replace Time(0.0) with `Clock::now()` once JSON double
+  // conversion is fixed. For now using a rounded time avoids the issue.
+  const Time start = Time::create(0.0).get() + Seconds(60);
+  const Duration duration = Seconds(120);
+  const Unavailability unavailability = createUnavailability(start, duration);
+
+  maintenance::Schedule schedule = createSchedule(
+      {createWindow({machine}, unavailability)});
+
+  Future<Response> response = process::http::post(
+      master.get(),
+      "maintenance/schedule",
+      headers,
+      stringify(JSON::Protobuf(schedule)));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Now start a framework.
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  Mesos mesos(
+      master.get(),
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+  v1::FrameworkID id(event.get().subscribed().framework_id());
+
+  // Ensure we receive some regular resource offers.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_NE(0, event.get().offers().offers().size());
+  EXPECT_EQ(0, event.get().offers().inverse_offers().size());
+
+  // All the offers should have unavailability.
+  foreach (const v1::Offer& offer, event.get().offers().offers()) {
+    EXPECT_TRUE(offer.has_unavailability());
+  }
+
+  // Just work with a single offer to simplify the rest of the test.
+  v1::Offer offer = event.get().offers().offers(0);
+
+  EXPECT_CALL(exec, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // A dummy task just for confirming that the offer is accepted.
+  // TODO(josephw): Replace with v1 API createTask helper.
+  v1::TaskInfo taskInfo =
+    evolve(createTask(devolve(offer), "", DEFAULT_EXECUTOR_ID));
+
+  {
+    // Accept this one offer.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo);
+
+    mesos.send(call);
+  }
+
+  // Expect an inverse offer.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+
+  // Save this inverse offer so we can decline it later.
+  v1::InverseOffer inverseOffer = event.get().offers().inverse_offers(0);
+
+  // Wait for the task to start running.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+
+  {
+    // Acknowledge TASK_RUNNING update.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo.task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer.agent_id());
+    acknowledge->set_uuid(event.get().update().status().uuid());
+
+    mesos.send(call);
+  }
+
+  // TODO(hartem): The filters in this test do not actually
+  // do anything, because inverse offer filters have not been
+  // implemented yet.  Instead, the accept/decline calls use
+  // the default time, which results in a slow test.
+
+  {
+    // Decline an inverse offer, with a filter.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::DECLINE);
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(inverseOffer.id());
+
+    // Set a 0 second filter to immediately get another inverse offer.
+    v1::Filters filters;
+    filters.set_refuse_seconds(0);
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // Expect another inverse offer.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  inverseOffer = event.get().offers().inverse_offers(0);
+
+  {
+    // Accept an inverse offer, with filter.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(inverseOffer.id());
+
+    // Set a 0 second filter to immediately get another inverse offer.
+    v1::Filters filters;
+    filters.set_refuse_seconds(0);
+    accept->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // Expect yet another inverse offer.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+
+  EXPECT_CALL(exec, shutdown(_))
+    .Times(AtMost(1));
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {