You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by jo...@apache.org on 2015/09/20 20:37:35 UTC

[1/5] mesos git commit: Renamed Filter to OfferFilter.

Repository: mesos
Updated Branches:
  refs/heads/master e91e45540 -> a92ff3cd7


Renamed Filter to OfferFilter.

Review: https://reviews.apache.org/r/38244


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f3883949
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f3883949
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f3883949

Branch: refs/heads/master
Commit: f38839494edf62b2a16196506390f7d0add0a8cb
Parents: e91e455
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:18:33 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:03 2015 -0400

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.hpp | 83 +++++++++++++-----------
 1 file changed, 46 insertions(+), 37 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f3883949/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 8f2232a..e944120 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -51,7 +51,7 @@ namespace master {
 namespace allocator {
 
 // Forward declarations.
-class Filter;
+class OfferFilter;
 
 
 // We forward declare the hierarchical allocator process so that we
@@ -195,12 +195,12 @@ protected:
   void expire(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
-      Filter* filter);
+      OfferFilter* offerFilter);
 
   // Checks whether the slave is whitelisted.
   bool isWhitelisted(const SlaveID& slaveId);
 
-  // Returns true if there is a filter for this framework
+  // Returns true if there is a resource offer filter for this framework
   // on this slave.
   bool isFiltered(
       const FrameworkID& frameworkId,
@@ -250,8 +250,8 @@ protected:
     // Whether the framework desires revocable resources.
     bool revocable;
 
-    // Active filters for the framework.
-    hashmap<SlaveID, hashset<Filter*>> filters;
+    // Active filters on offers for the framework.
+    hashmap<SlaveID, hashset<OfferFilter*>> offerFilters;
   };
 
   double _event_queue_dispatches()
@@ -291,7 +291,7 @@ protected:
     // slave, and the responses from frameworks as to whether they will be able
     // to gracefully handle this unavailability.
     // NOTE: We currently implement maintenance in the allocator to be able to
-    // leverage state and features such as the FrameworkSorter and Filters.
+    // leverage state and features such as the FrameworkSorter and OfferFilter.
     struct Maintenance
     {
       Maintenance(const Unavailability& _unavailability)
@@ -348,19 +348,19 @@ protected:
 
 
 // Used to represent "filters" for resources unused in offers.
-class Filter
+class OfferFilter
 {
 public:
-  virtual ~Filter() {}
+  virtual ~OfferFilter() {}
 
   virtual bool filter(const Resources& resources) = 0;
 };
 
 
-class RefusedFilter: public Filter
+class RefusedOfferFilter: public OfferFilter
 {
 public:
-  RefusedFilter(
+  RefusedOfferFilter(
       const Resources& _resources,
       const process::Timeout& _timeout)
     : resources(_resources), timeout(_timeout) {}
@@ -490,7 +490,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::removeFramework(
   }
 
   // Do not delete the filters contained in this
-  // framework's 'filters' hashset yet, see comments in
+  // framework's `offerFilters` hashset yet, see comments in
   // HierarchicalAllocatorProcess::reviveOffers and
   // HierarchicalAllocatorProcess::expire.
   frameworks.erase(frameworkId);
@@ -536,10 +536,10 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
   // the added/removed and activated/deactivated in the future.
 
   // Do not delete the filters contained in this
-  // framework's 'filters' hashset yet, see comments in
+  // framework's `offerFilters` hashset yet, see comments in
   // HierarchicalAllocatorProcess::reviveOffers and
   // HierarchicalAllocatorProcess::expire.
-  frameworks[frameworkId].filters.clear();
+  frameworks[frameworkId].offerFilters.clear();
 
   LOG(INFO) << "Deactivated framework " << frameworkId;
 }
@@ -611,7 +611,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::addSlave(
   slaves[slaveId].hostname = slaveInfo.hostname();
 
   // NOTE: We currently implement maintenance in the allocator to be able to
-  // leverage state and features such as the FrameworkSorter and Filters.
+  // leverage state and features such as the FrameworkSorter and OfferFilter.
   if (unavailability.isSome()) {
     slaves[slaveId].maintenance =
       typename Slave::Maintenance(unavailability.get());
@@ -857,7 +857,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
   CHECK(slaves.contains(slaveId));
 
   // NOTE: We currently implement maintenance in the allocator to be able to
-  // leverage state and features such as the FrameworkSorter and Filters.
+  // leverage state and features such as the FrameworkSorter and OfferFilter.
 
   // Remove any old unavailability.
   slaves[slaveId].maintenance = None();
@@ -885,7 +885,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
   CHECK(slaves[slaveId].maintenance.isSome());
 
   // NOTE: We currently implement maintenance in the allocator to be able to
-  // leverage state and features such as the FrameworkSorter and Filters.
+  // leverage state and features such as the FrameworkSorter and OfferFilter.
 
   // We use a reference by alias because we intend to modify the
   // `maintenance` and to improve readability.
@@ -1001,13 +1001,19 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
             << " for " << seconds.get();
 
     // Create a new filter and delay its expiration.
-    Filter* filter = new RefusedFilter(
+    OfferFilter* offerFilter = new RefusedOfferFilter(
         resources,
         process::Timeout::in(seconds.get()));
 
-    frameworks[frameworkId].filters[slaveId].insert(filter);
+    frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
 
-    delay(seconds.get(), self(), &Self::expire, frameworkId, slaveId, filter);
+    delay(
+        seconds.get(),
+        self(),
+        &Self::expire,
+        frameworkId,
+        slaveId,
+        offerFilter);
   }
 }
 
@@ -1029,17 +1035,17 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
 {
   CHECK(initialized);
 
-  frameworks[frameworkId].filters.clear();
+  frameworks[frameworkId].offerFilters.clear();
   frameworks[frameworkId].quiesced = false;
 
-  // We delete each actual Filter when
-  // HierarchicalAllocatorProcess::expire gets invoked. If we delete the
-  // Filter here it's possible that the same Filter (i.e., same
-  // address) could get reused and HierarchicalAllocatorProcess::expire
+  // We delete each actual `OfferFilter` when
+  // `HierarchicalAllocatorProcess::expire` gets invoked. If we delete the
+  // `OfferFilter` here it's possible that the same `OfferFilter` (i.e., same
+  // address) could get reused and `HierarchicalAllocatorProcess::expire`
   // would expire that filter too soon. Note that this only works
   // right now because ALL Filter types "expire".
 
-  LOG(INFO) << "Removed filters for framework " << frameworkId;
+  LOG(INFO) << "Removed offer filters for framework " << frameworkId;
 
   allocate();
 }
@@ -1266,7 +1272,7 @@ void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
     const FrameworkID& frameworkId,
     const SlaveID& slaveId,
-    Filter* filter)
+    OfferFilter* offerFilter)
 {
   // The filter might have already been removed (e.g., if the
   // framework no longer exists or in
@@ -1274,15 +1280,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
   // keep the address from getting reused possibly causing premature
   // expiration).
   if (frameworks.contains(frameworkId) &&
-      frameworks[frameworkId].filters.contains(slaveId) &&
-      frameworks[frameworkId].filters[slaveId].contains(filter)) {
-    frameworks[frameworkId].filters[slaveId].erase(filter);
-    if (frameworks[frameworkId].filters[slaveId].empty()) {
-      frameworks[frameworkId].filters.erase(slaveId);
+      frameworks[frameworkId].offerFilters.contains(slaveId) &&
+      frameworks[frameworkId].offerFilters[slaveId].contains(offerFilter)) {
+    frameworks[frameworkId].offerFilters[slaveId].erase(offerFilter);
+    if (frameworks[frameworkId].offerFilters[slaveId].empty()) {
+      frameworks[frameworkId].offerFilters.erase(slaveId);
     }
   }
 
-  delete filter;
+  delete offerFilter;
 }
 
 
@@ -1312,18 +1318,21 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
   // framework. This is a short term fix until the following is resolved:
   // https://issues.apache.org/jira/browse/MESOS-444.
   if (frameworks[frameworkId].checkpoint && !slaves[slaveId].checkpoint) {
-    VLOG(1) << "Filtered " << resources
+    VLOG(1) << "Filtered offer with " << resources
             << " on non-checkpointing slave " << slaveId
             << " for checkpointing framework " << frameworkId;
+
     return true;
   }
 
-  if (frameworks[frameworkId].filters.contains(slaveId)) {
-    foreach (Filter* filter, frameworks[frameworkId].filters[slaveId]) {
-      if (filter->filter(resources)) {
-        VLOG(1) << "Filtered " << resources
+  if (frameworks[frameworkId].offerFilters.contains(slaveId)) {
+    foreach (
+      OfferFilter* offerFilter, frameworks[frameworkId].offerFilters[slaveId]) {
+      if (offerFilter->filter(resources)) {
+        VLOG(1) << "Filtered offer with " << resources
                 << " on slave " << slaveId
                 << " for framework " << frameworkId;
+
         return true;
       }
     }


[4/5] mesos git commit: Maintenance Primitives: Added test for inverse offer filters.

Posted by jo...@apache.org.
Maintenance Primitives: Added test for inverse offer filters.

Checks that filters change which inverse offer is sent when.

Review: https://reviews.apache.org/r/38475


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/31defd41
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/31defd41
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/31defd41

Branch: refs/heads/master
Commit: 31defd41bae4670247aa5b92f799234bcad9200b
Parents: 9d03297
Author: Joseph Wu <jo...@mesosphere.io>
Authored: Sat Sep 19 14:24:44 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:22 2015 -0400

----------------------------------------------------------------------
 src/tests/master_maintenance_tests.cpp | 325 +++++++++++++++++++++++++++-
 1 file changed, 320 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/31defd41/src/tests/master_maintenance_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_maintenance_tests.cpp b/src/tests/master_maintenance_tests.cpp
index 9892bc3..c5277a1 100644
--- a/src/tests/master_maintenance_tests.cpp
+++ b/src/tests/master_maintenance_tests.cpp
@@ -51,6 +51,7 @@
 
 #include "slave/flags.hpp"
 
+#include "tests/containerizer.hpp"
 #include "tests/mesos.hpp"
 #include "tests/utils.hpp"
 
@@ -83,6 +84,7 @@ using std::vector;
 
 using testing::AtMost;
 using testing::DoAll;
+using testing::Not;
 
 namespace mesos {
 namespace internal {
@@ -1167,11 +1169,6 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
     mesos.send(call);
   }
 
-  // TODO(hartem): The filters in this test do not actually
-  // do anything, because inverse offer filters have not been
-  // implemented yet.  Instead, the accept/decline calls use
-  // the default time, which results in a slow test.
-
   {
     // Decline an inverse offer, with a filter.
     Call call;
@@ -1227,6 +1224,324 @@ TEST_F(MasterMaintenanceTest, InverseOffers)
   Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
 }
 
+
+// Test ensures that inverse offers support filters.
+TEST_F(MasterMaintenanceTest, InverseOffersFilters)
+{
+  // Set up a master.
+  // NOTE: We don't use `StartMaster()` because we need to access these flags.
+  master::Flags flags = CreateMasterFlags();
+
+  Try<PID<Master>> master = StartMaster(flags);
+  ASSERT_SOME(master);
+
+  ExecutorInfo executor1 = CREATE_EXECUTOR_INFO("executor-1", "exit 1");
+  ExecutorInfo executor2 = CREATE_EXECUTOR_INFO("executor-2", "exit 2");
+
+  MockExecutor exec1(executor1.executor_id());
+  MockExecutor exec2(executor2.executor_id());
+
+  hashmap<ExecutorID, Executor*> execs;
+  execs[executor1.executor_id()] = &exec1;
+  execs[executor2.executor_id()] = &exec2;
+
+  TestContainerizer containerizer(execs);
+
+  EXPECT_CALL(exec1, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec1, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  EXPECT_CALL(exec2, registered(_, _, _, _))
+    .Times(1);
+
+  EXPECT_CALL(exec2, launchTask(_, _))
+    .WillOnce(SendStatusUpdateFromTask(TASK_RUNNING));
+
+  // Capture the registration message for the first slave.
+  Future<SlaveRegisteredMessage> slave1RegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), _);
+
+  // We need two agents for this test.
+  Try<PID<Slave>> slave1 = StartSlave(&containerizer);
+  ASSERT_SOME(slave1);
+
+  // We need to make sure the first slave registers before we schedule the
+  // machine it is running on for maintenance.
+  AWAIT_READY(slave1RegisteredMessage);
+
+  // Capture the registration message for the second slave.
+  Future<SlaveRegisteredMessage> slave2RegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get(), Not(slave1.get()));
+
+  slave::Flags slaveFlags2 = MesosTest::CreateSlaveFlags();
+  slaveFlags2.hostname = maintenanceHostname + "-2";
+
+  Try<PID<Slave>> slave2 = StartSlave(&containerizer, slaveFlags2);
+  ASSERT_SOME(slave2);
+
+  // We need to make sure the second slave registers before we schedule the
+  // machine it is running on for maintenance.
+  AWAIT_READY(slave2RegisteredMessage);
+
+  // Before starting any frameworks, put the first machine into `DRAINING` mode.
+  MachineID machine1;
+  machine1.set_hostname(maintenanceHostname);
+  machine1.set_ip(stringify(slave1.get().address.ip));
+
+  MachineID machine2;
+  machine2.set_hostname(slaveFlags2.hostname.get());
+  machine2.set_ip(stringify(slave2.get().address.ip));
+
+  const Time start = Clock::now() + Seconds(60);
+  const Duration duration = Seconds(120);
+  const Unavailability unavailability = createUnavailability(start, duration);
+
+  maintenance::Schedule schedule = createSchedule(
+      {createWindow({machine1, machine2}, unavailability)});
+
+  Future<Response> response = process::http::post(
+      master.get(),
+      "maintenance/schedule",
+      headers,
+      stringify(JSON::Protobuf(schedule)));
+
+  AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+  // Pause the clock before starting a framework.
+  // This ensures deterministic offer-ing behavior during the test.
+  Clock::pause();
+
+  // Now start a framework.
+  Callbacks callbacks;
+
+  Future<Nothing> connected;
+  EXPECT_CALL(callbacks, connected())
+    .WillOnce(FutureSatisfy(&connected));
+
+  Mesos mesos(
+      master.get(),
+      lambda::bind(&Callbacks::connected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::disconnected, lambda::ref(callbacks)),
+      lambda::bind(&Callbacks::received, lambda::ref(callbacks), lambda::_1));
+
+  AWAIT_READY(connected);
+
+  Queue<Event> events;
+
+  EXPECT_CALL(callbacks, received(_))
+    .WillRepeatedly(Enqueue(&events));
+
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(DEFAULT_V1_FRAMEWORK_INFO);
+
+    mesos.send(call);
+  }
+
+  Future<Event> event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::SUBSCRIBED, event.get().type());
+
+  v1::FrameworkID id(event.get().subscribed().framework_id());
+
+  // Trigger a batch allocation.
+  Clock::advance(flags.allocation_interval);
+  Clock::settle();
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(2, event.get().offers().offers().size());
+  EXPECT_EQ(0, event.get().offers().inverse_offers().size());
+
+  // All the offers should have unavailability.
+  foreach (const v1::Offer& offer, event.get().offers().offers()) {
+    EXPECT_TRUE(offer.has_unavailability());
+  }
+
+  // Save both offers.
+  v1::Offer offer1 = event.get().offers().offers(0);
+  v1::Offer offer2 = event.get().offers().offers(1);
+
+  // Spawn dummy tasks using both offers.
+  v1::TaskInfo taskInfo1 =
+    evolve(createTask(devolve(offer1), "exit 1", executor1.executor_id()));
+
+  v1::TaskInfo taskInfo2 =
+    evolve(createTask(devolve(offer2), "exit 2", executor2.executor_id()));
+
+    sleep(2);
+
+  {
+    // Accept the first offer.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer1.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo1);
+
+    mesos.send(call);
+  }
+
+  {
+    // Accept the second offer.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(offer2.id());
+
+    v1::Offer::Operation* operation = accept->add_operations();
+    operation->set_type(v1::Offer::Operation::LAUNCH);
+    operation->mutable_launch()->add_task_infos()->CopyFrom(taskInfo2);
+
+    mesos.send(call);
+  }
+
+  // The order of events is deterministic from here on.
+  Clock::resume();
+
+  // Expect two inverse offers.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(2, event.get().offers().inverse_offers().size());
+
+  // Save these inverse offers.
+  v1::InverseOffer inverseOffer1 = event.get().offers().inverse_offers(0);
+  v1::InverseOffer inverseOffer2 = event.get().offers().inverse_offers(1);
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+
+  {
+    // Acknowledge TASK_RUNNING update for one task.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo1.task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer1.agent_id());
+    acknowledge->set_uuid(event.get().update().status().uuid());
+
+    mesos.send(call);
+  }
+
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::UPDATE, event.get().type());
+  EXPECT_EQ(v1::TASK_RUNNING, event.get().update().status().state());
+
+  {
+    // Acknowledge TASK_RUNNING update for the other task.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACKNOWLEDGE);
+
+    Call::Acknowledge* acknowledge = call.mutable_acknowledge();
+    acknowledge->mutable_task_id()->CopyFrom(taskInfo2.task_id());
+    acknowledge->mutable_agent_id()->CopyFrom(offer2.agent_id());
+    acknowledge->set_uuid(event.get().update().status().uuid());
+
+    mesos.send(call);
+  }
+
+  {
+    // Decline the second inverse offer, with a filter set such that we
+    // should not see this inverse offer in the next allocation.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::DECLINE);
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(inverseOffer2.id());
+
+    v1::Filters filters;
+    filters.set_refuse_seconds(flags.allocation_interval.secs() + 1);
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  {
+    // Accept the first inverse offer, with a filter set such that we
+    // should immediately see this inverse offer again.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::ACCEPT);
+
+    Call::Accept* accept = call.mutable_accept();
+    accept->add_offer_ids()->CopyFrom(inverseOffer1.id());
+
+    v1::Filters filters;
+    filters.set_refuse_seconds(0);
+    accept->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // Expect one inverse offer.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(
+      inverseOffer1.agent_id(),
+      event.get().offers().inverse_offers(0).agent_id());
+
+  inverseOffer1 = event.get().offers().inverse_offers(0);
+
+  {
+    // Do another immediate filter, but decline it this time.
+    Call call;
+    call.mutable_framework_id()->CopyFrom(id);
+    call.set_type(Call::DECLINE);
+
+    Call::Decline* decline = call.mutable_decline();
+    decline->add_offer_ids()->CopyFrom(inverseOffer1.id());
+
+    v1::Filters filters;
+    filters.set_refuse_seconds(0);
+    decline->mutable_filters()->CopyFrom(filters);
+
+    mesos.send(call);
+  }
+
+  // Expect the same inverse offer.
+  event = events.get();
+  AWAIT_READY(event);
+  EXPECT_EQ(Event::OFFERS, event.get().type());
+  EXPECT_EQ(0, event.get().offers().offers().size());
+  EXPECT_EQ(1, event.get().offers().inverse_offers().size());
+  EXPECT_EQ(
+      inverseOffer1.agent_id(),
+      event.get().offers().inverse_offers(0).agent_id());
+
+  EXPECT_CALL(exec1, shutdown(_))
+    .Times(AtMost(1));
+
+  EXPECT_CALL(exec2, shutdown(_))
+    .Times(AtMost(1));
+
+  Shutdown(); // Must shutdown before 'containerizer' gets deallocated.
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[5/5] mesos git commit: Synchronized global environment manipulation in TestContainerizer.

Posted by jo...@apache.org.
Synchronized global environment manipulation in TestContainerizer.

This fixes non-deterministic behavior in the TestContainerizer. There
is still an open issue mentioned in the comments and documented in
MESOS-3475.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a92ff3cd
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a92ff3cd
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a92ff3cd

Branch: refs/heads/master
Commit: a92ff3cd7388cfcf948e4ffa3dabcad98a29e3a8
Parents: 31defd4
Author: Joris Van Remoortere <jo...@gmail.com>
Authored: Sun Sep 20 14:04:42 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:28 2015 -0400

----------------------------------------------------------------------
 src/tests/containerizer.cpp | 105 +++++++++++++++++++++++----------------
 1 file changed, 62 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a92ff3cd/src/tests/containerizer.cpp
----------------------------------------------------------------------
diff --git a/src/tests/containerizer.cpp b/src/tests/containerizer.cpp
index 5134e63..1f74315 100644
--- a/src/tests/containerizer.cpp
+++ b/src/tests/containerizer.cpp
@@ -17,6 +17,11 @@
  */
 
 #include "tests/containerizer.hpp"
+
+#include <mutex>
+
+#include "stout/synchronized.hpp"
+
 #include "tests/mesos.hpp"
 
 using std::map;
@@ -99,56 +104,70 @@ Future<bool> TestContainerizer::_launch(
   containers_[key] = containerId;
 
   Executor* executor = executors[executorInfo.executor_id()];
-  Owned<MesosExecutorDriver> driver(new MesosExecutorDriver(executor));
-  drivers[containerId] = driver;
-
-  // Prepare additional environment variables for the executor.
-  // TODO(benh): Need to get flags passed into the TestContainerizer
-  // in order to properly use here.
-  slave::Flags flags;
-  flags.recovery_timeout = Duration::zero();
-
-  // We need to save the original set of environment variables so we
-  // can reset the environment after calling 'driver->start()' below.
-  hashmap<string, string> original = os::environment();
-
-  const map<string, string> environment = executorEnvironment(
-      executorInfo,
-      directory,
-      slaveId,
-      slavePid,
-      checkpoint,
-      flags);
 
-  foreachpair (const string& name, const string variable, environment) {
-    os::setenv(name, variable);
-  }
+  // We need to synchronize all reads and writes to the environment as this is
+  // global state.
+  // TODO(jmlvanre): Even this is not sufficient, as other aspects of the code
+  // may read an environment variable while we are manipulating it. The better
+  // solution is to pass the environment variables into the fork, or to set them
+  // on the command line. See MESOS-3475.
+  static std::mutex mutex;
+
+  synchronized(mutex) {
+    // Since the constructor for `MesosExecutorDriver` reads environment
+    // variables to load flags, even it needs to be within this synchronization
+    // section.
+    Owned<MesosExecutorDriver> driver(new MesosExecutorDriver(executor));
+    drivers[containerId] = driver;
+
+    // Prepare additional environment variables for the executor.
+    // TODO(benh): Need to get flags passed into the TestContainerizer
+    // in order to properly use here.
+    slave::Flags flags;
+    flags.recovery_timeout = Duration::zero();
+
+    // We need to save the original set of environment variables so we
+    // can reset the environment after calling 'driver->start()' below.
+    hashmap<string, string> original = os::environment();
+
+    const map<string, string> environment = executorEnvironment(
+        executorInfo,
+        directory,
+        slaveId,
+        slavePid,
+        checkpoint,
+        flags);
+
+    foreachpair (const string& name, const string variable, environment) {
+      os::setenv(name, variable);
+    }
 
-  // TODO(benh): Can this be removed and done exlusively in the
-  // 'executorEnvironment()' function? There are other places in the
-  // code where we do this as well and it's likely we can do this once
-  // in 'executorEnvironment()'.
-  foreach (const Environment::Variable& variable,
-           executorInfo.command().environment().variables()) {
-    os::setenv(variable.name(), variable.value());
-  }
+    // TODO(benh): Can this be removed and done exlusively in the
+    // 'executorEnvironment()' function? There are other places in the
+    // code where we do this as well and it's likely we can do this once
+    // in 'executorEnvironment()'.
+    foreach (const Environment::Variable& variable,
+            executorInfo.command().environment().variables()) {
+      os::setenv(variable.name(), variable.value());
+    }
 
-  os::setenv("MESOS_LOCAL", "1");
+    os::setenv("MESOS_LOCAL", "1");
 
-  driver->start();
+    driver->start();
 
-  os::unsetenv("MESOS_LOCAL");
+    os::unsetenv("MESOS_LOCAL");
 
-  // Unset the environment variables we set by resetting them to their
-  // original values and also removing any that were not part of the
-  // original environment.
-  foreachpair (const string& name, const string& value, original) {
-    os::setenv(name, value);
-  }
+    // Unset the environment variables we set by resetting them to their
+    // original values and also removing any that were not part of the
+    // original environment.
+    foreachpair (const string& name, const string& value, original) {
+      os::setenv(name, value);
+    }
 
-  foreachkey (const string& name, environment) {
-    if (!original.contains(name)) {
-      os::unsetenv(name);
+    foreachkey (const string& name, environment) {
+      if (!original.contains(name)) {
+        os::unsetenv(name);
+      }
     }
   }
 


[3/5] mesos git commit: Added fitler support for Inverse Offers.

Posted by jo...@apache.org.
Added fitler support for Inverse Offers.

Review: https://reviews.apache.org/r/38324


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d03297a
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d03297a
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d03297a

Branch: refs/heads/master
Commit: 9d03297a9064dcde3ec920db4ef66003b4d323da
Parents: eec3fec
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:24:35 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:15 2015 -0400

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |   3 +-
 src/master/allocator/mesos/allocator.hpp    |  12 +-
 src/master/allocator/mesos/hierarchical.hpp | 205 +++++++++++++++++++++--
 src/master/master.cpp                       |   6 +-
 src/tests/mesos.hpp                         |  11 +-
 5 files changed, 215 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 7301058..3fea47f 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -160,7 +160,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<InverseOfferStatus>& status) = 0;
+      const Option<InverseOfferStatus>& status,
+      const Option<Filters>& filters = None()) = 0;
 
   // Informs the Allocator to recover resources that are considered
   // used by the framework.

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 4f02dd1..904dc62 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -120,7 +120,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<mesos::master::InverseOfferStatus>& status);
+      const Option<mesos::master::InverseOfferStatus>& status,
+      const Option<Filters>& filters);
 
   void recoverResources(
       const FrameworkID& frameworkId,
@@ -228,7 +229,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<mesos::master::InverseOfferStatus>& status) = 0;
+      const Option<mesos::master::InverseOfferStatus>& status,
+      const Option<Filters>& filters = None()) = 0;
 
   virtual void recoverResources(
       const FrameworkID& frameworkId,
@@ -489,7 +491,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
     const Option<UnavailableResources>& unavailableResources,
-    const Option<mesos::master::InverseOfferStatus>& status)
+    const Option<mesos::master::InverseOfferStatus>& status,
+    const Option<Filters>& filters)
 {
   return process::dispatch(
       process,
@@ -497,7 +500,8 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
       slaveId,
       frameworkId,
       unavailableResources,
-      status);
+      status,
+      filters);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index a4c4107..d3496bc 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -52,7 +52,7 @@ namespace allocator {
 
 // Forward declarations.
 class OfferFilter;
-
+class InverseOfferFilter;
 
 // We forward declare the hierarchical allocator process so that we
 // can typedef an instantiation of it with DRF sorters.
@@ -158,7 +158,8 @@ public:
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
       const Option<UnavailableResources>& unavailableResources,
-      const Option<mesos::master::InverseOfferStatus>& status);
+      const Option<mesos::master::InverseOfferStatus>& status,
+      const Option<Filters>& filters);
 
   void recoverResources(
       const FrameworkID& frameworkId,
@@ -192,12 +193,18 @@ protected:
   // Send inverse offers from the specified slaves.
   void deallocate(const hashset<SlaveID>& slaveIds);
 
-  // Remove a filter for the specified framework.
+  // Remove an offer filter for the specified framework.
   void expire(
       const FrameworkID& frameworkId,
       const SlaveID& slaveId,
       OfferFilter* offerFilter);
 
+  // Remove an inverse offer filter for the specified framework.
+  void expire(
+      const FrameworkID& frameworkId,
+      const SlaveID& slaveId,
+      InverseOfferFilter* inverseOfferFilter);
+
   // Checks whether the slave is whitelisted.
   bool isWhitelisted(const SlaveID& slaveId);
 
@@ -208,6 +215,12 @@ protected:
       const SlaveID& slaveId,
       const Resources& resources);
 
+  // Returns true if there is an inverse offer filter for this framework
+  // on this slave.
+  bool isFiltered(
+      const FrameworkID& frameworkID,
+      const SlaveID& slaveID);
+
   bool allocatable(const Resources& resources);
 
   bool initialized;
@@ -251,8 +264,9 @@ protected:
     // Whether the framework desires revocable resources.
     bool revocable;
 
-    // Active filters on offers for the framework.
+    // Active offer and inverse offer filters for the framework.
     hashmap<SlaveID, hashset<OfferFilter*>> offerFilters;
+    hashmap<SlaveID, hashset<InverseOfferFilter*>> inverseOfferFilters;
   };
 
   double _event_queue_dispatches()
@@ -382,6 +396,41 @@ public:
 };
 
 
+// Used to represent "filters" for inverse offers.
+// NOTE: Since this specific allocator implementation only sends inverse offers
+// for maintenance primitives, and those are at the whole slave level, we only
+// need to filter based on the time-out.
+// If this allocator implementation starts sending out more resource specific
+// inverse offers, then we can capture the `unavailableResources` in the filter
+// function.
+class InverseOfferFilter
+{
+public:
+  virtual ~InverseOfferFilter() {}
+
+  virtual bool filter() = 0;
+};
+
+
+// NOTE: See comment above `InverseOfferFilter` regarding capturing
+// `unavailableResources` if this allocator starts sending fine-grained inverse
+// offers.
+class RefusedInverseOfferFilter: public InverseOfferFilter
+{
+public:
+  RefusedInverseOfferFilter(const process::Timeout& _timeout)
+    : timeout(_timeout) {}
+
+  virtual bool filter()
+  {
+    // See comment above why we currently don't do more fine-grained filtering.
+    return timeout.remaining() > Seconds(0);
+  }
+
+  const process::Timeout timeout;
+};
+
+
 template <class RoleSorter, class FrameworkSorter>
 void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::initialize(
@@ -541,6 +590,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deactivateFramework(
   // HierarchicalAllocatorProcess::reviveOffers and
   // HierarchicalAllocatorProcess::expire.
   frameworks[frameworkId].offerFilters.clear();
+  frameworks[frameworkId].inverseOfferFilters.clear();
 
   LOG(INFO) << "Deactivated framework " << frameworkId;
 }
@@ -860,6 +910,15 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateUnavailability(
   // NOTE: We currently implement maintenance in the allocator to be able to
   // leverage state and features such as the FrameworkSorter and OfferFilter.
 
+  // We explicitly remove all filters for the inverse offers of this slave. We
+  // do this because we want to force frameworks to reassess the calculations
+  // they have made to respond to the inverse offer. Unavailability of a slave
+  // can have a large effect on failure domain calculations and inter-leaved
+  // unavailability schedules.
+  foreachvalue (Framework& framework, frameworks) {
+    framework.inverseOfferFilters.erase(slaveId);
+  }
+
   // Remove any old unavailability.
   slaves[slaveId].maintenance = None();
 
@@ -879,7 +938,8 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
     const Option<UnavailableResources>& unavailableResources,
-    const Option<mesos::master::InverseOfferStatus>& status)
+    const Option<mesos::master::InverseOfferStatus>& status,
+    const Option<Filters>& filters)
 {
   CHECK(initialized);
   CHECK(frameworks.contains(frameworkId));
@@ -915,6 +975,58 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
       maintenance.statuses[frameworkId].CopyFrom(status.get());
     }
   }
+
+  // No need to install filters if `filters` is none.
+  if (filters.isNone()) {
+    return;
+  }
+
+  // Create a refused resource filter.
+  Try<Duration> seconds = Duration::create(filters.get().refuse_seconds());
+
+  if (seconds.isError()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused inverse offer filter because the input value "
+                 << "is invalid: " << seconds.error();
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  } else if (seconds.get() < Duration::zero()) {
+    LOG(WARNING) << "Using the default value of 'refuse_seconds' to create "
+                 << "the refused inverse offer filter because the input value "
+                 << "is negative";
+
+    seconds = Duration::create(Filters().refuse_seconds());
+  }
+
+  CHECK_SOME(seconds);
+
+  if (seconds.get() != Duration::zero()) {
+    VLOG(1) << "Framework " << frameworkId
+            << " filtered inverse offers from slave " << slaveId
+            << " for " << seconds.get();
+
+    // Create a new inverse offer filter and delay its expiration.
+    InverseOfferFilter* inverseOfferFilter =
+      new RefusedInverseOfferFilter(process::Timeout::in(seconds.get()));
+
+    frameworks[frameworkId]
+      .inverseOfferFilters[slaveId].insert(inverseOfferFilter);
+
+    // We need to disambiguate the function call to pick the correct
+    // expire() overload.
+    void (Self::*expireInverseOffer)(
+             const FrameworkID&,
+             const SlaveID&,
+             InverseOfferFilter*) = &Self::expire;
+
+    delay(
+        seconds.get(),
+        self(),
+        expireInverseOffer,
+        frameworkId,
+        slaveId,
+        inverseOfferFilter);
+  }
 }
 
 
@@ -1009,10 +1121,17 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::recoverResources(
 
     frameworks[frameworkId].offerFilters[slaveId].insert(offerFilter);
 
+    // We need to disambiguate the function call to pick the correct
+    // expire() overload.
+    void (Self::*expireOffer)(
+              const FrameworkID&,
+              const SlaveID&,
+              OfferFilter*) = &Self::expire;
+
     delay(
         seconds.get(),
         self(),
-        &Self::expire,
+        expireOffer,
         frameworkId,
         slaveId,
         offerFilter);
@@ -1038,6 +1157,7 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::reviveOffers(
   CHECK(initialized);
 
   frameworks[frameworkId].offerFilters.clear();
+  frameworks[frameworkId].inverseOfferFilters.clear();
   frameworks[frameworkId].quiesced = false;
 
   // We delete each actual `OfferFilter` when
@@ -1239,14 +1359,26 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::deallocate(
             // If there isn't already an outstanding inverse offer to this
             // framework for the specified slave.
             if (!maintenance.offersOutstanding.contains(frameworkId)) {
+              // Ignore in case the framework filters inverse offers for this
+              // slave.
+              // NOTE: Since this specific allocator implementation only sends
+              // inverse offers for maintenance primitives, and those are at the
+              // whole slave level, we only need to filter based on the
+              // time-out.
+              if (isFiltered(frameworkId, slaveId)) {
+                continue;
+              }
+
+              const UnavailableResources unavailableResources =
+                UnavailableResources{
+                    Resources(),
+                    maintenance.unavailability};
+
               // For now we send inverse offers with empty resources when the
               // inverse offer represents maintenance on the machine. In the
               // future we could be more specific about the resources on the
               // host, as we have the information available.
-              offerable[frameworkId][slaveId] =
-                UnavailableResources{
-                    Resources(),
-                    maintenance.unavailability};
+              offerable[frameworkId][slaveId] = unavailableResources;
 
               // Mark this framework as having an offer oustanding for the
               // specified slave.
@@ -1295,6 +1427,34 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
 
 
 template <class RoleSorter, class FrameworkSorter>
+void
+HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::expire(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId,
+    InverseOfferFilter* inverseOfferFilter)
+{
+  // The filter might have already been removed (e.g., if the
+  // framework no longer exists or in
+  // HierarchicalAllocatorProcess::reviveOffers) but not yet deleted (to
+  // keep the address from getting reused possibly causing premature
+  // expiration).
+  if (frameworks.contains(frameworkId) &&
+      frameworks[frameworkId].inverseOfferFilters.contains(slaveId) &&
+      frameworks[frameworkId].inverseOfferFilters[slaveId]
+        .contains(inverseOfferFilter)) {
+    frameworks[frameworkId].inverseOfferFilters[slaveId]
+      .erase(inverseOfferFilter);
+
+    if(frameworks[frameworkId].inverseOfferFilters[slaveId].empty()) {
+      frameworks[frameworkId].inverseOfferFilters.erase(slaveId);
+    }
+  }
+
+  delete inverseOfferFilter;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
 bool
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isWhitelisted(
     const SlaveID& slaveId)
@@ -1345,6 +1505,31 @@ HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
 
 
 template <class RoleSorter, class FrameworkSorter>
+bool HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::isFiltered(
+    const FrameworkID& frameworkId,
+    const SlaveID& slaveId)
+{
+  CHECK(frameworks.contains(frameworkId));
+  CHECK(slaves.contains(slaveId));
+
+  if (frameworks[frameworkId].inverseOfferFilters.contains(slaveId)) {
+    foreach (
+        InverseOfferFilter* inverseOfferFilter,
+        frameworks[frameworkId].inverseOfferFilters[slaveId]) {
+      if (inverseOfferFilter->filter()) {
+        VLOG(1) << "Filtered unavailability on slave " << slaveId
+                << " for framework " << frameworkId;
+
+        return true;
+      }
+    }
+  }
+
+  return false;
+}
+
+
+template <class RoleSorter, class FrameworkSorter>
 bool
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::allocatable(
     const Resources& resources)

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5393ee8..6c0db21 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2850,7 +2850,8 @@ void Master::accept(
             UnavailableResources{
                 inverseOffer->resources(),
                 inverseOffer->unavailability()},
-            status);
+            status,
+            accept.filters());
 
         removeInverseOffer(inverseOffer);
         continue;
@@ -3324,7 +3325,8 @@ void Master::decline(
           UnavailableResources{
               inverseOffer->resources(),
               inverseOffer->unavailability()},
-          status);
+          status,
+          decline.filters());
 
       removeInverseOffer(inverseOffer);
       continue;

http://git-wip-us.apache.org/repos/asf/mesos/blob/9d03297a/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index dd587bb..e1c0635 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1361,7 +1361,7 @@ ACTION_P(InvokeUpdateUnavailability, allocator)
 
 ACTION_P(InvokeUpdateInverseOffer, allocator)
 {
-  return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3);
+  return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3, arg4);
 }
 
 
@@ -1499,9 +1499,9 @@ public:
     EXPECT_CALL(*this, updateUnavailability(_, _))
       .WillRepeatedly(DoDefault());
 
-    ON_CALL(*this, updateInverseOffer(_, _, _, _))
+    ON_CALL(*this, updateInverseOffer(_, _, _, _, _))
       .WillByDefault(InvokeUpdateInverseOffer(this));
-    EXPECT_CALL(*this, updateInverseOffer(_, _, _, _))
+    EXPECT_CALL(*this, updateInverseOffer(_, _, _, _, _))
       .WillRepeatedly(DoDefault());
 
     ON_CALL(*this, recoverResources(_, _, _, _))
@@ -1590,11 +1590,12 @@ public:
       const SlaveID&,
       const Option<Unavailability>&));
 
-  MOCK_METHOD4(updateInverseOffer, void(
+  MOCK_METHOD5(updateInverseOffer, void(
       const SlaveID&,
       const FrameworkID&,
       const Option<UnavailableResources>&,
-      const Option<mesos::master::InverseOfferStatus>&));
+      const Option<mesos::master::InverseOfferStatus>&,
+      const Option<Filters>&));
 
   MOCK_METHOD4(recoverResources, void(
       const FrameworkID&,


[2/5] mesos git commit: Propagated UnavailableResources from Inverse Offers to the allocator.

Posted by jo...@apache.org.
Propagated UnavailableResources from Inverse Offers to the allocator.

Review: https://reviews.apache.org/r/38246


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/eec3fec0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/eec3fec0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/eec3fec0

Branch: refs/heads/master
Commit: eec3fec01a28d48fd26a71de7d918df71032d4ef
Parents: f388394
Author: Artem Harutyunyan <ar...@mesosphere.io>
Authored: Sat Sep 19 14:24:23 2015 -0400
Committer: Joris Van Remoortere <jo...@gmail.com>
Committed: Sun Sep 20 14:21:09 2015 -0400

----------------------------------------------------------------------
 include/mesos/master/allocator.hpp          |  5 ++++-
 src/master/allocator/mesos/allocator.hpp    |  4 ++++
 src/master/allocator/mesos/hierarchical.hpp |  2 ++
 src/master/master.cpp                       | 27 ++++++++++++++++++++++++
 src/tests/mesos.hpp                         |  9 ++++----
 5 files changed, 42 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/include/mesos/master/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/master/allocator.hpp b/include/mesos/master/allocator.hpp
index 2dc6312..7301058 100644
--- a/include/mesos/master/allocator.hpp
+++ b/include/mesos/master/allocator.hpp
@@ -153,10 +153,13 @@ public:
   // revoked. If `status` is not set then the inverse offer was not responded
   // to, possibly because the offer timed out or was rescinded. This might
   // require the implementation of the function to remove any inverse offers
-  // that are outstanding.
+  // that are outstanding. The `unavailableResources` can be used by the
+  // allocator to distinguish between different inverse offers sent to the same
+  // framework for the same slave.
   virtual void updateInverseOffer(
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
+      const Option<UnavailableResources>& unavailableResources,
       const Option<InverseOfferStatus>& status) = 0;
 
   // Informs the Allocator to recover resources that are considered

http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 86f6c55..4f02dd1 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -119,6 +119,7 @@ public:
   void updateInverseOffer(
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
+      const Option<UnavailableResources>& unavailableResources,
       const Option<mesos::master::InverseOfferStatus>& status);
 
   void recoverResources(
@@ -226,6 +227,7 @@ public:
   virtual void updateInverseOffer(
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
+      const Option<UnavailableResources>& unavailableResources,
       const Option<mesos::master::InverseOfferStatus>& status) = 0;
 
   virtual void recoverResources(
@@ -486,6 +488,7 @@ template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
+    const Option<UnavailableResources>& unavailableResources,
     const Option<mesos::master::InverseOfferStatus>& status)
 {
   return process::dispatch(
@@ -493,6 +496,7 @@ inline void MesosAllocator<AllocatorProcess>::updateInverseOffer(
       &MesosAllocatorProcess::updateInverseOffer,
       slaveId,
       frameworkId,
+      unavailableResources,
       status);
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index e944120..a4c4107 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -157,6 +157,7 @@ public:
   void updateInverseOffer(
       const SlaveID& slaveId,
       const FrameworkID& frameworkId,
+      const Option<UnavailableResources>& unavailableResources,
       const Option<mesos::master::InverseOfferStatus>& status);
 
   void recoverResources(
@@ -877,6 +878,7 @@ void
 HierarchicalAllocatorProcess<RoleSorter, FrameworkSorter>::updateInverseOffer(
     const SlaveID& slaveId,
     const FrameworkID& frameworkId,
+    const Option<UnavailableResources>& unavailableResources,
     const Option<mesos::master::InverseOfferStatus>& status)
 {
   CHECK(initialized);

http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 5eef29b..5393ee8 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2343,6 +2343,9 @@ void Master::_subscribe(
         allocator->updateInverseOffer(
             inverseOffer->slave_id(),
             inverseOffer->framework_id(),
+            UnavailableResources{
+                inverseOffer->resources(),
+                inverseOffer->unavailability()},
             None());
 
         removeInverseOffer(inverseOffer, true); // Rescind.
@@ -2508,6 +2511,9 @@ void Master::deactivate(Framework* framework)
     allocator->updateInverseOffer(
         inverseOffer->slave_id(),
         inverseOffer->framework_id(),
+        UnavailableResources{
+            inverseOffer->resources(),
+            inverseOffer->unavailability()},
         None());
 
     removeInverseOffer(inverseOffer, true); // Rescind.
@@ -2557,6 +2563,9 @@ void Master::deactivate(Slave* slave)
     allocator->updateInverseOffer(
         slave->id,
         inverseOffer->framework_id(),
+        UnavailableResources{
+            inverseOffer->resources(),
+            inverseOffer->unavailability()},
         None());
 
     removeInverseOffer(inverseOffer, true); // Rescind!
@@ -2838,6 +2847,9 @@ void Master::accept(
         allocator->updateInverseOffer(
             inverseOffer->slave_id(),
             inverseOffer->framework_id(),
+            UnavailableResources{
+                inverseOffer->resources(),
+                inverseOffer->unavailability()},
             status);
 
         removeInverseOffer(inverseOffer);
@@ -3309,6 +3321,9 @@ void Master::decline(
       allocator->updateInverseOffer(
           inverseOffer->slave_id(),
           inverseOffer->framework_id(),
+          UnavailableResources{
+              inverseOffer->resources(),
+              inverseOffer->unavailability()},
           status);
 
       removeInverseOffer(inverseOffer);
@@ -4320,6 +4335,9 @@ void Master::updateUnavailability(
         allocator->updateInverseOffer(
             slave->id,
             inverseOffer->framework_id(),
+            UnavailableResources{
+                inverseOffer->resources(),
+                inverseOffer->unavailability()},
             None());
 
         removeInverseOffer(inverseOffer, true); // Rescind!
@@ -5494,6 +5512,9 @@ void Master::_failoverFramework(Framework* framework)
     allocator->updateInverseOffer(
         inverseOffer->slave_id(),
         inverseOffer->framework_id(),
+        UnavailableResources{
+            inverseOffer->resources(),
+            inverseOffer->unavailability()},
         None());
 
     removeInverseOffer(inverseOffer);
@@ -5606,6 +5627,9 @@ void Master::removeFramework(Framework* framework)
     allocator->updateInverseOffer(
         inverseOffer->slave_id(),
         inverseOffer->framework_id(),
+        UnavailableResources{
+            inverseOffer->resources(),
+            inverseOffer->unavailability()},
         None());
 
     removeInverseOffer(inverseOffer);
@@ -6247,6 +6271,9 @@ void Master::inverseOfferTimeout(const OfferID& inverseOfferId)
     allocator->updateInverseOffer(
         inverseOffer->slave_id(),
         inverseOffer->framework_id(),
+        UnavailableResources{
+            inverseOffer->resources(),
+            inverseOffer->unavailability()},
         None());
 
     removeInverseOffer(inverseOffer, true);

http://git-wip-us.apache.org/repos/asf/mesos/blob/eec3fec0/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 760dcb7..dd587bb 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -1361,7 +1361,7 @@ ACTION_P(InvokeUpdateUnavailability, allocator)
 
 ACTION_P(InvokeUpdateInverseOffer, allocator)
 {
-  return allocator->real->updateInverseOffer(arg0, arg1, arg2);
+  return allocator->real->updateInverseOffer(arg0, arg1, arg2, arg3);
 }
 
 
@@ -1499,9 +1499,9 @@ public:
     EXPECT_CALL(*this, updateUnavailability(_, _))
       .WillRepeatedly(DoDefault());
 
-    ON_CALL(*this, updateInverseOffer(_, _, _))
+    ON_CALL(*this, updateInverseOffer(_, _, _, _))
       .WillByDefault(InvokeUpdateInverseOffer(this));
-    EXPECT_CALL(*this, updateInverseOffer(_, _, _))
+    EXPECT_CALL(*this, updateInverseOffer(_, _, _, _))
       .WillRepeatedly(DoDefault());
 
     ON_CALL(*this, recoverResources(_, _, _, _))
@@ -1590,9 +1590,10 @@ public:
       const SlaveID&,
       const Option<Unavailability>&));
 
-  MOCK_METHOD3(updateInverseOffer, void(
+  MOCK_METHOD4(updateInverseOffer, void(
       const SlaveID&,
       const FrameworkID&,
+      const Option<UnavailableResources>&,
       const Option<mesos::master::InverseOfferStatus>&));
 
   MOCK_METHOD4(recoverResources, void(