You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/11/30 17:35:54 UTC

[1/8] mesos git commit: Made sure only non-terminal operations use resources.

Repository: mesos
Updated Branches:
  refs/heads/master f7858bb3a -> c035e7e87


Made sure only non-terminal operations use resources.

The function 'addOfferOperation' is a high-level function which can be
used to some extent manage the offer operation life cycle in the
master. In addition to managing the lifetime of offer operations, it
also maintains the state of used resources.

We already made sure to never count speculated operations as using
resources in this function. In addition, the function
'updateOfferOperation' is already capable of updating the used
resources when an offer operation becomes terminal.  To complete this
division of responsibilities we in this patch extend the conditional
treatment in 'removeOfferOperation' to also take into account whether
the operation is terminal when deciding on whether to mutate the used
resources.

Review: https://reviews.apache.org/r/64163/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/567f250e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/567f250e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/567f250e

Branch: refs/heads/master
Commit: 567f250e5b7af391b7ba17c2b5e3395daa03d017
Parents: f7858bb
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:03:18 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:57 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/567f250e/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index fadc78b..3470db9 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -10847,7 +10847,8 @@ void Slave::addOfferOperation(OfferOperation* operation)
 
   offerOperations.put(uuid.get(), operation);
 
-  if (!protobuf::isSpeculativeOperation(operation->info())) {
+  if (!protobuf::isSpeculativeOperation(operation->info()) &&
+      !protobuf::isTerminalState(operation->latest_status().state())) {
     Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
 
     CHECK_SOME(consumed);


[8/8] mesos git commit: Removed currently unneeded 'AWAIT_READY's in 'MockResourceProvider'.

Posted by bb...@apache.org.
Removed currently unneeded 'AWAIT_READY's in 'MockResourceProvider'.

The 'AWAIT_READY's in 'MockResourceProvider' let to the resource
provider not being usable with paused clock as we would run into a
deadclock in that case.

This patch removes the explicit awaits here to make this resource
provider usable. We should consider making it easier to debug send
failures in the resource provider, e.g., by surfacing the future
responses to users in some way, or by added additional logging.

Review: https://reviews.apache.org/r/64086/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a622d940
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a622d940
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a622d940

Branch: refs/heads/master
Commit: a622d9406e46d574e31dd6ce7e1a31e56a3f294e
Parents: 8ccbdb9
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:04:04 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 src/tests/mesos.hpp | 6 +++---
 1 file changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a622d940/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 5fe5356..c871878 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2904,7 +2904,7 @@ public:
     call.mutable_subscribe()->mutable_resource_provider_info()->set_name(
         "test");
 
-    AWAIT_READY(driver->send(call));
+    driver->send(call);
   }
 
   void subscribedDefault(const typename Event::Subscribed& subscribed)
@@ -2927,7 +2927,7 @@ public:
       update->mutable_resources()->CopyFrom(injected);
       update->set_resource_version_uuid(UUID::random().toBytes());
 
-      AWAIT_READY(driver->send(call));
+      driver->send(call);
     }
   }
 
@@ -3003,7 +3003,7 @@ public:
 
     update->mutable_latest_status()->CopyFrom(update->status());
 
-    AWAIT_READY(driver->send(call));
+    driver->send(call);
   }
 
   Option<ResourceProviderID> resourceProviderId;


[5/8] mesos git commit: Added a new allocator method to add resources to agents.

Posted by bb...@apache.org.
Added a new allocator method to add resources to agents.

The added method complements 'Allocator::addSlave'. While in
'addSlave' the total agent resources and used resources are passed,
the method 'addResourceProvider' added here allows to add additional,
potentially used resources to an existing agent.

Review: https://reviews.apache.org/r/63997/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b220abcc
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b220abcc
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b220abcc

Branch: refs/heads/master
Commit: b220abcc0118204a6fa63a3c287ea7272af42b50
Parents: 8c0f8a4
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:03:50 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 include/mesos/allocator/allocator.hpp       |  13 +++
 src/master/allocator/mesos/allocator.hpp    |  25 +++++
 src/master/allocator/mesos/hierarchical.cpp |  21 ++++
 src/master/allocator/mesos/hierarchical.hpp |   5 +
 src/tests/allocator.hpp                     |  16 +++
 src/tests/hierarchical_allocator_tests.cpp  | 135 +++++++++++++++++++++++
 6 files changed, 215 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/include/mesos/allocator/allocator.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/allocator/allocator.hpp b/include/mesos/allocator/allocator.hpp
index ae12200..acb9e4f 100644
--- a/include/mesos/allocator/allocator.hpp
+++ b/include/mesos/allocator/allocator.hpp
@@ -215,6 +215,19 @@ public:
           capabilities = None()) = 0;
 
   /**
+   * Add resources from a local resource provider to an agent.
+   *
+   * @param slave Id of the agent to modify.
+   * @param total The resources to add to the agent's total resources.
+   * @param used The resources to add to the resources tracked as used
+   *     for this agent.
+   */
+  virtual void addResourceProvider(
+      const SlaveID& slave,
+      const Resources& total,
+      const hashmap<FrameworkID, Resources>& used) = 0;
+
+  /**
    * Activates an agent. This is invoked when an agent reregisters. Offers
    * are only sent for activated agents.
    */

http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/master/allocator/mesos/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/allocator.hpp b/src/master/allocator/mesos/allocator.hpp
index 8fa4fde..48254b6 100644
--- a/src/master/allocator/mesos/allocator.hpp
+++ b/src/master/allocator/mesos/allocator.hpp
@@ -102,6 +102,11 @@ public:
       const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>& capabilities = None());
 
+  void addResourceProvider(
+      const SlaveID& slave,
+      const Resources& total,
+      const hashmap<FrameworkID, Resources>& used);
+
   void activateSlave(
       const SlaveID& slaveId);
 
@@ -243,6 +248,11 @@ public:
       const Option<std::vector<SlaveInfo::Capability>>&
           capabilities = None()) = 0;
 
+  virtual void addResourceProvider(
+      const SlaveID& slave,
+      const Resources& total,
+      const hashmap<FrameworkID, Resources>& used) = 0;
+
   virtual void activateSlave(
       const SlaveID& slaveId) = 0;
 
@@ -490,6 +500,21 @@ inline void MesosAllocator<AllocatorProcess>::updateSlave(
 
 
 template <typename AllocatorProcess>
+void MesosAllocator<AllocatorProcess>::addResourceProvider(
+    const SlaveID& slave,
+    const Resources& total,
+    const hashmap<FrameworkID, Resources>& used)
+{
+  process::dispatch(
+      process,
+      &MesosAllocatorProcess::addResourceProvider,
+      slave,
+      total,
+      used);
+}
+
+
+template <typename AllocatorProcess>
 inline void MesosAllocator<AllocatorProcess>::activateSlave(
     const SlaveID& slaveId)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 6b0be6a..ab2abf8 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -627,6 +627,27 @@ void HierarchicalAllocatorProcess::updateSlave(
 }
 
 
+void HierarchicalAllocatorProcess::addResourceProvider(
+    const SlaveID& slaveId,
+    const Resources& total,
+    const hashmap<FrameworkID, Resources>& used)
+{
+  CHECK(initialized);
+  CHECK(slaves.contains(slaveId));
+
+  trackAllocatedResources(slaveId, used);
+
+  Slave& slave = slaves.at(slaveId);
+  updateSlaveTotal(slaveId, slave.total + total);
+  slave.allocated += Resources::sum(used);
+
+  VLOG(1)
+    << "Grew agent " << slaveId << " by "
+    << total << " (total), "
+    << used << " (used)";
+}
+
+
 void HierarchicalAllocatorProcess::activateSlave(
     const SlaveID& slaveId)
 {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index fd604c9..3c87dc7 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -147,6 +147,11 @@ public:
       const Option<Resources>& total = None(),
       const Option<std::vector<SlaveInfo::Capability>>& capabilities = None());
 
+  void addResourceProvider(
+      const SlaveID& slave,
+      const Resources& total,
+      const hashmap<FrameworkID, Resources>& used);
+
   void deactivateSlave(
       const SlaveID& slaveId);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/tests/allocator.hpp
----------------------------------------------------------------------
diff --git a/src/tests/allocator.hpp b/src/tests/allocator.hpp
index 6a84f1b..fc5d9ef 100644
--- a/src/tests/allocator.hpp
+++ b/src/tests/allocator.hpp
@@ -103,6 +103,12 @@ ACTION_P(InvokeUpdateSlave, allocator)
 }
 
 
+ACTION_P(InvokeAddResourceProvider, allocator)
+{
+  allocator->real->addResourceProvider(arg0, arg1, arg2);
+}
+
+
 ACTION_P(InvokeActivateSlave, allocator)
 {
   allocator->real->activateSlave(arg0);
@@ -279,6 +285,11 @@ public:
     EXPECT_CALL(*this, updateSlave(_, _, _))
       .WillRepeatedly(DoDefault());
 
+    ON_CALL(*this, addResourceProvider(_, _, _))
+      .WillByDefault(InvokeAddResourceProvider(this));
+    EXPECT_CALL(*this, addResourceProvider(_, _, _))
+      .WillRepeatedly(DoDefault());
+
     ON_CALL(*this, activateSlave(_))
       .WillByDefault(InvokeActivateSlave(this));
     EXPECT_CALL(*this, activateSlave(_))
@@ -410,6 +421,11 @@ public:
       const Option<Resources>&,
       const Option<std::vector<SlaveInfo::Capability>>&));
 
+  MOCK_METHOD3(addResourceProvider, void(
+      const SlaveID&,
+      const Resources&,
+      const hashmap<FrameworkID, Resources>&));
+
   MOCK_METHOD1(activateSlave, void(
       const SlaveID&));
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/b220abcc/src/tests/hierarchical_allocator_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/hierarchical_allocator_tests.cpp b/src/tests/hierarchical_allocator_tests.cpp
index f0f95ba..0309074 100644
--- a/src/tests/hierarchical_allocator_tests.cpp
+++ b/src/tests/hierarchical_allocator_tests.cpp
@@ -1402,6 +1402,141 @@ TEST_F(HierarchicalAllocatorTest, RecoverResources)
 }
 
 
+// Checks that resource provider resources can be added to an agent
+// and that the added used resources are correctly taken into account
+// when computing fair share.
+TEST_F(HierarchicalAllocatorTest, AddResourceProvider)
+{
+  Clock::pause();
+
+  initialize();
+
+  // Register two deactivated frameworks.
+  FrameworkInfo framework1 = createFrameworkInfo({"role1"});
+  allocator->addFramework(framework1.id(), framework1, {}, false, {});
+
+  FrameworkInfo framework2 = createFrameworkInfo({"role2"});
+  allocator->addFramework(framework2.id(), framework2, {}, false, {});
+
+  // Add a single agent with `resources` resources.
+  const Resources resources = Resources::parse("cpus:1;mem:100;disk:10").get();
+
+  SlaveInfo slave1 = createSlaveInfo(resources);
+  allocator->addSlave(
+      slave1.id(),
+      slave1,
+      AGENT_CAPABILITIES(),
+      None(),
+      slave1.resources(),
+      {});
+
+  {
+    // Add a resource provider with `resources*2` to the agent, all in
+    // use by `framework1`.
+    Resources allocation = resources + resources;
+    allocation.allocate("role1");
+    allocator->addResourceProvider(
+        slave1.id(),
+        resources + resources,
+        {{framework1.id(), allocation}});
+  }
+
+  // Activate `framework2`. The next allocation will be to
+  // `framework2` which is the only active framework. After that
+  // `framework1`'s dominant share is 2/3 and `framework2`'s is 1/3.
+  allocator->activateFramework(framework2.id());
+
+  {
+    Resources allocation = slave1.resources();
+    allocation.allocate("role2");
+    Allocation expected = Allocation(
+        framework2.id(),
+        {{"role2", {{slave1.id(), allocation}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+
+  // Activate `framework1` so it can receive offers. Currently all
+  // available resources are allocated.
+  allocator->activateFramework(framework1.id());
+
+  // Add another agent with `resources` resources. With that
+  // `framework1` no has a dominant share of 2/4 and `framework2` of
+  // 1/4.
+  SlaveInfo slave2 = createSlaveInfo(resources);
+  allocator->addSlave(
+      slave2.id(),
+      slave2,
+      AGENT_CAPABILITIES(),
+      None(),
+      slave2.resources(),
+      {});
+
+  {
+    // The next allocation will be to `framework2` since it is
+    // furthest below fair share.
+    Resources allocation = slave2.resources();
+    allocation.allocate("role2");
+    Allocation expected = Allocation(
+        framework2.id(),
+        {{"role2", {{slave2.id(), allocation}}}});
+
+    AWAIT_EXPECT_EQ(expected, allocations.get());
+  }
+}
+
+
+// Check that even if as an overallocated resource provider is added to an
+// agent, new allocations are only made for unused agent resources.
+TEST_F(HierarchicalAllocatorTest, AddResourceProviderOverallocated)
+{
+  Clock::pause();
+
+  initialize();
+
+  const Resources resources = Resources::parse("cpus:1;mem:100;disk:10").get();
+
+  // Register an agent.
+  SlaveInfo slave = createSlaveInfo(resources + resources);
+  allocator->addSlave(
+      slave.id(),
+      slave,
+      AGENT_CAPABILITIES(),
+      None(),
+      slave.resources(),
+      {});
+
+  // Register a framework in deactivated state
+  // so it initially does not receive offers.
+  FrameworkInfo framework = createFrameworkInfo({"role"});
+  allocator->addFramework(framework.id(), framework, {}, false, {});
+
+  // Track an allocation to the framework of half the agent's resources. We add
+  // no new resources to the total, but just increment the used resources.
+  Resources allocation = resources;
+  allocation.allocate("role");
+  allocator->addResourceProvider(
+      slave.id(),
+      Resources(),
+      {{framework.id(), allocation}});
+
+  // Activate framework so it receives offers.
+  allocator->activateFramework(framework.id());
+
+  // Trigger a batch allocation. In the subsequent offer we expect the
+  // framework to receive the other half of the agent's resources so
+  // that it now has all its resources allocated to it.
+  Clock::advance(flags.allocation_interval);
+  Clock::settle();
+
+  Allocation expected = Allocation(
+      framework.id(),
+      {{"role", {{slave.id(), allocation}}}});
+
+  AWAIT_EXPECT_EQ(expected, allocations.get());
+}
+
+
 TEST_F(HierarchicalAllocatorTest, Allocatable)
 {
   // Pausing the clock is not necessary, but ensures that the test


[2/8] mesos git commit: Reconciled offer operations between agent and master.

Posted by bb...@apache.org.
Reconciled offer operations between agent and master.

This patch adds master reconciliation logic to interpret agent offer
operation state from 'UpdateSlaveMessage'. The approach we take is to
unpack the agent update into resource providers updates and update
the master's view of each resource provider individually.

Review: https://reviews.apache.org/r/63732/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8ccbdb9b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8ccbdb9b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8ccbdb9b

Branch: refs/heads/master
Commit: 8ccbdb9babe960f86bba5ae573e5939ee22981da
Parents: b220abc
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:03:57 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp | 334 ++++++++++++++++++++++++++++++++++++++++++++-
 1 file changed, 330 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8ccbdb9b/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 16978c0..eadc008 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -25,6 +25,7 @@
 #include <set>
 #include <sstream>
 #include <tuple>
+#include <utility>
 
 #include <mesos/module.hpp>
 #include <mesos/roles.hpp>
@@ -7027,6 +7028,8 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
       message_.mutable_total_resources(),
       POST_RESERVATION_REFINEMENT);
 
+  // Agents will send a total if a resource provider subscribed or went away.
+  // Process resources and operations grouped by resource provider.
   if (hasTotal) {
     const Resources& totalResources = message_.total_resources();
 
@@ -7053,22 +7056,345 @@ void Master::updateSlave(const UpdateSlaveMessage& message)
     newTotal.getOrElse(slave->totalResources.nonRevocable()) +
     newOversubscribed.getOrElse(slave->totalResources.revocable());
 
+  bool updated = slave->totalResources != newSlaveResources;
+
   // Agents which can support resource providers always update the
   // master on their resource versions uuids via `UpdateSlaveMessage`.
   if (slave->capabilities.resourceProvider) {
-    slave->resourceVersions =
+    hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
       protobuf::parseResourceVersions(message.resource_version_uuids());
+
+    updated = updated || slave->resourceVersions != resourceVersions;
+    slave->resourceVersions = resourceVersions;
   }
 
-  if (newSlaveResources == slave->totalResources) {
+  // Check if the known offer operations for this agent changed.
+  updated =
+    updated ||
+    (slave->offerOperations.empty() && message.has_offer_operations()) ||
+    (!slave->offerOperations.empty() && !message.has_offer_operations());
+  if (!updated) {
+    const hashset<UUID> knownOfferOperations = slave->offerOperations.keys();
+    hashset<UUID> receivedOfferOperations;
+
+    foreach (
+        const OfferOperation& operation,
+        message.offer_operations().operations()) {
+        Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+        CHECK_SOME(operationUuid);
+        receivedOfferOperations.insert(operationUuid.get());
+    }
+
+    updated = updated || knownOfferOperations != receivedOfferOperations;
+  }
+
+  if (!updated) {
     LOG(INFO) << "Ignoring update on agent " << *slave
               << " as it reports no changes";
     return;
   }
 
-  slave->totalResources = newSlaveResources;
+  struct ResourceProvider
+  {
+    Option<Resources> oldTotal;
+    Option<Resources> newTotal;
+    Option<hashmap<UUID, OfferOperation>> oldOfferOperations;
+    Option<hashmap<UUID, OfferOperation>> newOfferOperations;
+  };
+
+  // We store information on the different `ResourceProvider`s on this agent in
+  // a map, indexed by an optional provider id. Since the provider ID field for
+  // resources is only set for resources from true resource providers and is not
+  // set for agent default resources, the value for the key `None` points to
+  // information about the agent itself, not its resource providers.
+  hashmap<Option<ResourceProviderID>, ResourceProvider> resourceProviders;
+
+  // Group the resources and operation updates by resource provider.
+  {
+    auto groupResourcesByProviderId = [](const Resources& resources) {
+      hashmap<Option<ResourceProviderID>, Resources> result;
+
+      foreach (const Resource& resource, resources) {
+        Option<ResourceProviderID> providerId =
+          Resources::hasResourceProvider(resource)
+            ? resource.provider_id()
+            : Option<ResourceProviderID>::none();
+
+        result[std::move(providerId)] += resource;
+      }
+
+      return result;
+    };
+
+    foreachpair (
+        const Option<ResourceProviderID>& providerId,
+        const Resources& resources,
+        groupResourcesByProviderId(slave->totalResources)) {
+      resourceProviders[providerId].oldTotal = resources;
+    }
+
+    foreachpair (
+        const Option<ResourceProviderID>& providerId,
+        const Resources& resources,
+        groupResourcesByProviderId(newSlaveResources)) {
+      // Implicitly create a new record if none exists.
+      resourceProviders[providerId].newTotal = resources;
+    }
+
+    foreachpair (
+        const UUID& uuid,
+        OfferOperation* operation,
+        slave->offerOperations) {
+      Result<ResourceProviderID> providerId_ =
+        getResourceProviderId(operation->info());
+
+      CHECK(!providerId_.isError())
+        << "Failed to extract resource provider id from known operation: "
+        << providerId_.error();
+
+      Option<ResourceProviderID> providerId =
+        providerId_.isSome()
+          ? providerId_.get()
+          : Option<ResourceProviderID>::none();
+
+      // Set up an init empty list of existing operations. We might
+      // create a record for this resource provider if needed.
+      if (resourceProviders[providerId].oldOfferOperations.isNone()) {
+        resourceProviders.at(providerId).oldOfferOperations =
+          hashmap<UUID, OfferOperation>();
+      }
+
+      resourceProviders.at(providerId)
+        .oldOfferOperations->emplace(uuid, *operation);
+    }
+
+    // Process received offer operations.
+    foreach (
+        const OfferOperation& operation,
+        message.offer_operations().operations()) {
+      Result<ResourceProviderID> providerId_ =
+        getResourceProviderId(operation.info());
+
+      CHECK(!providerId_.isError())
+        << "Failed to extract resource provider id from known operation: "
+        << providerId_.error();
+
+      Option<ResourceProviderID> providerId =
+        providerId_.isSome()
+          ? providerId_.get()
+          : Option<ResourceProviderID>::none();
+
+      // Set up an init empty list of new operations. We might
+      // create a record for this resource provider if needed.
+      if (resourceProviders[providerId].newOfferOperations.isNone()) {
+        resourceProviders.at(providerId).newOfferOperations =
+          hashmap<UUID, OfferOperation>();
+      }
+
+      Try<UUID> uuid = UUID::fromBytes(operation.operation_uuid());
+      CHECK_SOME(uuid) << "Could not deserialize operation id when reconciling "
+                          "offer operations";
+
+      resourceProviders.at(providerId)
+        .newOfferOperations->emplace(uuid.get(), operation);
+    }
+  }
+
+  // Check invariants of the received update.
+  {
+    foreachpair (
+        const Option<ResourceProviderID>& providerId,
+        const ResourceProvider& provider,
+        resourceProviders) {
+      const bool isNewResourceProvider =
+        provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
+
+      if (!isNewResourceProvider) {
+        // For known resource providers the master should always know at least
+        // as many non-terminal offer operations as the agent. While an
+        // operation might get lost on the way to the agent or resource
+        // provider, or become terminal inside the agent, the master would never
+        // make an offer operation known to the agent terminal with the agent
+        // doing that first.
+        //
+        // NOTE: We only consider non-terminal operations here as there is an
+        // edge case where the master removes a terminal offer operation from
+        // its own state when it passes on an acknowledgement from a framework
+        // to the agent, but the agent fails over before it can process the
+        // acknowledgement, or the agent initiates an unrelated
+        // `UpdateSlaveMessage`.
+        auto extractPendingOperations =
+          [](const hashmap<UUID, OfferOperation>& source,
+             hashset<UUID>* target) {
+            foreachpair (
+                const UUID& uuid, const OfferOperation& operation, source) {
+              if (!protobuf::isTerminalState(
+                      operation.latest_status().state())) {
+                target->insert(uuid);
+              }
+            }
+          };
+
+        hashset<UUID> oldPendingOperations;
+        hashset<UUID> newPendingOperations;
+
+        if (provider.oldOfferOperations.isSome()) {
+          extractPendingOperations(
+              provider.oldOfferOperations.get(), &oldPendingOperations);
+        }
+
+        if (provider.newOfferOperations.isSome()) {
+          extractPendingOperations(
+              provider.newOfferOperations.get(), &newPendingOperations);
+        }
+
+        foreach (const UUID& uuid, newPendingOperations) {
+          CHECK(oldPendingOperations.contains(uuid))
+            << "Agent tried to reconcile unknown non-terminal offer "
+               "operation "
+            << uuid.toString();
+        }
+      }
+
+      if (providerId.isNone()) {
+        // We do not permit changes to agent (i.e., non-resource
+        // provider) non-revocable resources.
+        CHECK_SOME(provider.oldTotal);
+        CHECK_SOME(provider.newTotal);
+
+        Resources oldNonRevocable =
+          provider.oldTotal->nonRevocable().createStrippedScalarQuantity();
+        Resources newNonRevocable =
+          provider.newTotal->nonRevocable().createStrippedScalarQuantity();
+        CHECK_EQ(
+            provider.oldTotal->nonRevocable(),
+            provider.newTotal->nonRevocable());
+
+        // For agents only speculative operations can be reconciled.
+        //
+        // TODO(bbannier): Reconcile agent operations in
+        // `ReregisterSlaveMessage` in which case we expect agents to
+        // send the already known offer operations again here
+        // (possibly with changed status).
+        if (provider.newOfferOperations.isSome()) {
+          foreachvalue (
+              const OfferOperation& operation,
+              provider.newOfferOperations.get()) {
+            CHECK(protobuf::isSpeculativeOperation(operation.info()));
+          }
+        }
+      }
+    }
+  }
+
+  // Update master and allocator state.
+  foreachpair (
+      const Option<ResourceProviderID>& providerId,
+      const ResourceProvider& provider,
+      resourceProviders) {
+    const bool isNewResourceProvider =
+      provider.oldTotal.isNone() && provider.oldOfferOperations.isNone();
+
+    // Below we only add offer operations to our state from resource providers
+    // which are unknown, or possibly remove them for known resource providers.
+    // This works since the master should always known more offer operations of
+    // known resource provider than any resource provider itself.
+    //
+    // NOTE: We do not mutate offer operations statuses here; this
+    // would be the responsibility of a offer operation status
+    // update handler.
+    //
+    // There still exists a edge case where the master might remove a
+    // terminal offer operation from its state when passing an
+    // acknowledgement from a framework on to the agent with the agent
+    // failing over before the acknowledgement can be processed. In
+    // that case the agent would track an operation unknown to the
+    // master.
+    //
+    // TODO(bbannier): We might want to consider to also learn about
+    // new (terminal) operations when observing messages from status
+    // update managers to frameworks.
+
+    if (isNewResourceProvider) {
+      // If this is a not previously seen resource provider with
+      // operations we had a master failover. Add the resources and
+      // operations to our state.
+      CHECK_SOME(providerId);
+      CHECK_SOME(provider.newTotal);
+      CHECK(!slave->totalResources.contains(provider.newTotal.get()));
+
+      slave->totalResources += provider.newTotal.get();
+
+      hashmap<FrameworkID, Resources> usedByOperations;
+
+      if (provider.newOfferOperations.isSome()) {
+        foreachpair (
+            const UUID& uuid,
+            const OfferOperation& operation,
+            provider.newOfferOperations.get()) {
+          // Update to bookkeeping of operations.
+          CHECK(!slave->offerOperations.contains(uuid))
+            << "New operation " << uuid.toString() << " is already known";
+
+          Framework* framework = nullptr;
+          if (operation.has_framework_id()) {
+            framework = getFramework(operation.framework_id());
+          }
+
+          addOfferOperation(framework, slave, new OfferOperation(operation));
+        }
+      }
+
+      allocator->addResourceProvider(
+          slaveId,
+          provider.newTotal.get(),
+          usedByOperations);
+    } else {
+      // If this is a known resource provider or agent its total capacity cannot
+      // have changed, and it would not know about any non-terminal offer
+      // operations not already known to the master. It might however have not
+      // received an offer operations since the resource provider or agent fell
+      // over before the message could be received. We need to remove these
+      // operations from our state.
+
+      // Reconcile offer operations. This includes recovering
+      // resources in used by operations which did not reach the
+      // agent or resource provider.
+      if (provider.oldOfferOperations.isSome()) {
+        foreachkey (const UUID& uuid, provider.oldOfferOperations.get()) {
+          if (provider.newOfferOperations.isNone() ||
+              !provider.newOfferOperations->contains(uuid)) {
+            // TODO(bbannier): Instead of simply dropping an operation with
+            // `removeOfferOperation` here we should instead send a `Reconcile`
+            // message with a failed state to the agent so its status update
+            // manager can reliably deliver the operation status to the
+            // framework.
+            LOG(WARNING) << "Dropping known offer operation " << uuid.toString()
+                         << " since it was not present in reconciliation "
+                            "message from agent";
+
+            CHECK(slave->offerOperations.contains(uuid));
+            removeOfferOperation(slave->offerOperations.at(uuid));
+          }
+        }
+      }
+
+      // Reconcile the total resources. This includes undoing
+      // speculated operations which are only visible in the total,
+      // but never in the used resources. We explicitly allow for
+      // resource providers to change from or to zero capacity.
+      if (provider.oldTotal.isSome()) {
+        CHECK(slave->totalResources.contains(provider.oldTotal.get()));
+        slave->totalResources -= provider.oldTotal.get();
+      }
+
+      if (provider.newTotal.isSome()) {
+        slave->totalResources += provider.newTotal.get();
+      }
+    }
+  }
 
-  // Now update the agent's resources in the allocator.
+  // Now update the agent's total resources in the allocator.
   allocator->updateSlave(slaveId, slave->totalResources);
 
   // Then rescind outstanding offers affected by the update.


[7/8] mesos git commit: Implemented a test of offer operation reconcilation.

Posted by bb...@apache.org.
Implemented a test of offer operation reconcilation.

Whenever a speculated operation fails in a resource provider, we
expect the agent to trigger a 'UpdateSlaveMessage' to the master so it
can rollback its agent state. This patch adds such a test.

Review: https://reviews.apache.org/r/63843/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/c035e7e8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/c035e7e8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/c035e7e8

Branch: refs/heads/master
Commit: c035e7e87e18cdbbf2ba098520694d2c8e0d635e
Parents: a622d94
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:04:10 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 src/tests/slave_tests.cpp | 232 +++++++++++++++++++++++++++++++++++++++++
 1 file changed, 232 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/c035e7e8/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 823bd44..1344e0a 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -31,6 +31,8 @@
 
 #include <mesos/authentication/http/basic_authenticator_factory.hpp>
 
+#include <mesos/v1/mesos.hpp>
+
 #include <mesos/v1/resource_provider/resource_provider.hpp>
 
 #include <process/clock.hpp>
@@ -8821,6 +8823,236 @@ TEST_F(SlaveTest, ResourceVersions)
       reregisterSlaveMessage->resource_version_uuids(0));
 }
 
+
+// This test checks that a resource provider triggers an
+// `UpdateSlaveMessage` to be sent to the master if an non-speculated
+// offer operation fails in the resource provider.
+TEST_F(SlaveTest, ResourceProviderReconciliation)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability and other required capabilities.
+  constexpr SlaveInfo::Capability::Type capabilities[] = {
+    SlaveInfo::Capability::MULTI_ROLE,
+    SlaveInfo::Capability::HIERARCHICAL_ROLE,
+    SlaveInfo::Capability::RESERVATION_REFINEMENT,
+    SlaveInfo::Capability::RESOURCE_PROVIDER};
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  foreach (SlaveInfo::Capability::Type type, capabilities) {
+    SlaveInfo::Capability* capability =
+      slaveFlags.agent_features->add_capabilities();
+    capability->set_type(type);
+  }
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::settle();
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Register a resource provider with the agent.
+  v1::Resources resourceProviderResources = v1::createDiskResource(
+      "200",
+      "*",
+      None(),
+      None(),
+      v1::createDiskSourceRaw());
+
+  v1::MockResourceProvider resourceProvider(resourceProviderResources);
+
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  process::http::URL url(
+      scheme,
+      slave.get()->pid.address.ip,
+      slave.get()->pid.address.port,
+      slave.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  resourceProvider.start(
+      endpointDetector, ContentType::PROTOBUF, v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Register a framework to excercise offer operations.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+  Future<Nothing> connected;
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(FutureSatisfy(&connected));
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid,
+      ContentType::PROTOBUF,
+      scheduler);
+
+  AWAIT_READY(connected);
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  Future<v1::scheduler::Event::Offers> offers1;
+
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return()); // Ignore heartbeats.
+
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers1))
+    .WillRepeatedly(Return()); // Ignore subsequent offers;
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_role("foo");
+
+  // Subscribe the framework.
+  {
+    Call call;
+    call.set_type(Call::SUBSCRIBE);
+
+    Call::Subscribe* subscribe = call.mutable_subscribe();
+    subscribe->mutable_framework_info()->CopyFrom(frameworkInfo);
+
+    EXPECT_CALL(*scheduler, subscribed(_, _))
+      .WillOnce(FutureArg<1>(&subscribed));
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId = subscribed->framework_id();
+
+  AWAIT_READY(offers1);
+  ASSERT_FALSE(offers1->offers().empty());
+
+  // We now perform a `RESERVE` operation on the offered resources,
+  // but let the operation fail in the resource provider.
+  Future<v1::resource_provider::Event::Operation> operation;
+  EXPECT_CALL(resourceProvider, operation(_))
+    .WillOnce(FutureArg<0>(&operation));
+
+  {
+    const v1::Offer& offer = offers1->offers(0);
+
+    v1::Resources reserved = offer.resources();
+    reserved = reserved.filter(
+        [](const v1::Resource& r) { return r.has_provider_id(); });
+    reserved = reserved.pushReservation(v1::createDynamicReservationInfo(
+        frameworkInfo.role(), frameworkInfo.principal()));
+
+    Call call =
+      v1::createCallAccept(frameworkId, offer, {v1::RESERVE(reserved)});
+    call.mutable_accept()->mutable_filters()->set_refuse_seconds(0);
+
+    mesos.send(call);
+  }
+
+  AWAIT_READY(operation);
+
+  // We expect the agent to send an `UpdateSlaveMessage` since below
+  // the resource provider responds with an `UPDATE_STATE` call.
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Future<v1::scheduler::Event::Offers> offers2;
+
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers2))
+    .WillRepeatedly(Return()); // Ignore subsequent offers;
+
+  // Fail the operation in the resource provider. This should trigger
+  // an `UpdateSlaveMessage` to the master.
+  {
+    CHECK_SOME(resourceProvider.resourceProviderId);
+
+    v1::Resources resourceProviderResources_;
+    foreach (v1::Resource resource, resourceProviderResources) {
+      resource.mutable_provider_id()->CopyFrom(
+          resourceProvider.resourceProviderId.get());
+
+      resourceProviderResources_ += resource;
+    }
+
+    // Update the resource version of the resource provider.
+    UUID resourceVersionUuid = UUID::random();
+
+    v1::resource_provider::Call call;
+
+    call.set_type(v1::resource_provider::Call::UPDATE_STATE);
+    call.mutable_resource_provider_id()->CopyFrom(
+        resourceProvider.resourceProviderId.get());
+
+    v1::resource_provider::Call::UpdateState* updateState =
+      call.mutable_update_state();
+
+    updateState->set_resource_version_uuid(resourceVersionUuid.toBytes());
+    updateState->mutable_resources()->CopyFrom(resourceProviderResources_);
+
+    mesos::v1::OfferOperation* _operation = updateState->add_operations();
+    _operation->mutable_framework_id()->CopyFrom(operation->framework_id());
+    _operation->mutable_info()->CopyFrom(operation->info());
+    _operation->set_operation_uuid(operation->operation_uuid());
+
+    mesos::v1::OfferOperationStatus* lastStatus =
+      _operation->mutable_latest_status();
+    lastStatus->set_state(::mesos::v1::OFFER_OPERATION_FAILED);
+
+    _operation->add_statuses()->CopyFrom(*lastStatus);
+
+    AWAIT_READY(resourceProvider.send(call));
+  }
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // We expect to see the new resource provider resource version in
+  // the `UpdateSlaveMessage`.
+  hashmap<Option<ResourceProviderID>, UUID> resourceVersions =
+    protobuf::parseResourceVersions(
+        updateSlaveMessage->resource_version_uuids());
+  // The reserve operation will still be reported as pending since no offer
+  // operation status update has been received from the resource provider.
+  ASSERT_TRUE(updateSlaveMessage->has_offer_operations());
+  ASSERT_EQ(1, updateSlaveMessage->offer_operations().operations_size());
+
+  const OfferOperation& reserve =
+    updateSlaveMessage->offer_operations().operations(0);
+
+  EXPECT_EQ(Offer::Operation::RESERVE, reserve.info().type());
+  ASSERT_TRUE(reserve.has_latest_status());
+  EXPECT_EQ(OFFER_OPERATION_PENDING, reserve.latest_status().state());
+
+  // The resources are returned to the available pool and the framework will get
+  // offered the same resources as in the previous offer cycle.
+  Clock::advance(masterFlags.allocation_interval);
+  Clock::settle();
+
+  AWAIT_READY(offers2);
+  ASSERT_EQ(1, offers2->offers_size());
+
+  const v1::Offer& offer1 = offers1->offers(0);
+  const v1::Offer& offer2 = offers2->offers(0);
+
+  EXPECT_EQ(
+      v1::Resources(offer1.resources()), v1::Resources(offer2.resources()));
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {


[3/8] mesos git commit: Allowed removing non-terminal offer operations.

Posted by bb...@apache.org.
Allowed removing non-terminal offer operations.

During reconcilation we might be required to remove non-terminal offer
operations from bookkeeping.

Review: https://reviews.apache.org/r/63842/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0deabd3f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0deabd3f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0deabd3f

Branch: refs/heads/master
Commit: 0deabd3fda4485135e334ea045215dea69ebbb09
Parents: 567f250
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:03:27 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 src/master/master.cpp | 23 ++++++++++++++++++-----
 src/master/master.hpp |  3 ---
 src/slave/slave.cpp   |  3 ---
 3 files changed, 18 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0deabd3f/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3470db9..16978c0 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -9817,9 +9817,6 @@ void Master::removeOfferOperation(OfferOperation* operation)
 {
   CHECK_NOTNULL(operation);
 
-  CHECK(protobuf::isTerminalState(operation->latest_status().state()))
-    << operation->latest_status().state();
-
   // Remove from framework.
   Framework* framework = operation->has_framework_id()
     ? getFramework(operation->framework_id())
@@ -9838,6 +9835,20 @@ void Master::removeOfferOperation(OfferOperation* operation)
 
   slave->removeOfferOperation(operation);
 
+  // If the operation was not speculated and is not terminal we
+  // need to also recover its used resources in the allocator.
+  if (!protobuf::isSpeculativeOperation(operation->info()) &&
+      !protobuf::isTerminalState(operation->latest_status().state())) {
+    Try<Resources> consumed = protobuf::getConsumedResources(operation->info());
+    CHECK_SOME(consumed);
+
+    allocator->recoverResources(
+        operation->framework_id(),
+        operation->slave_id(),
+        consumed.get(),
+        None());
+  }
+
   delete operation;
 }
 
@@ -10921,8 +10932,10 @@ void Slave::removeOfferOperation(OfferOperation* operation)
     << "Unknown offer operation (uuid: " << uuid->toString() << ")"
     << " to agent " << *this;
 
-  CHECK(protobuf::isTerminalState(operation->latest_status().state()))
-    << operation->latest_status().state();
+  if (!protobuf::isSpeculativeOperation(operation->info()) &&
+      !protobuf::isTerminalState(operation->latest_status().state())) {
+    recoverResources(operation);
+  }
 
   offerOperations.erase(uuid.get());
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/0deabd3f/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 00fc421..1425080 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -2624,9 +2624,6 @@ struct Framework
       << "' (uuid: " << uuid->toString() << ") "
       << "of framework " << operation->framework_id();
 
-    CHECK(protobuf::isTerminalState(operation->latest_status().state()))
-      << operation->latest_status().state();
-
     offerOperations.erase(uuid.get());
   }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/0deabd3f/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 554a2da..a415894 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -7046,9 +7046,6 @@ void Slave::removeOfferOperation(OfferOperation* operation)
   CHECK(offerOperations.contains(uuid.get()))
     << "Unknown offer operation (uuid: " << uuid->toString() << ")";
 
-  CHECK(protobuf::isTerminalState(operation->latest_status().state()))
-    << operation->latest_status().state();
-
   offerOperations.erase(uuid.get());
   delete operation;
 }


[4/8] mesos git commit: Reconciled pending resource provider operations in agent.

Posted by bb...@apache.org.
Reconciled pending resource provider operations in agent.

When resource providers update their state they send a list of
pending or unacknowledged operations to the agent. This patch add
tracking for such operations to the agent. The agent can then forward
these operations to the master, e.g., for calculating the unused
resources behind an agent.

We track an operation until we either receive a updated list of
pending or unacknowledged operations from a resource provider, or
until we see an acknowledgement from a framework. This keeps the list
of operations bounded and ensures that we maintain the latest
information in the agent.

Review: https://reviews.apache.org/r/63731/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/42c14e1e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/42c14e1e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/42c14e1e

Branch: refs/heads/master
Commit: 42c14e1efc81ff617502cd360ad460d729802b2c
Parents: 0deabd3
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:03:38 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 src/slave/slave.cpp | 159 ++++++++++++++++++++++++++++++++++++++++-------
 1 file changed, 138 insertions(+), 21 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/42c14e1e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index a415894..a9aa987 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -1283,6 +1283,13 @@ void Slave::registered(
     message.mutable_resource_categories()->set_total(true);
     message.mutable_total_resources()->CopyFrom(totalResources);
 
+    UpdateSlaveMessage::OfferOperations* operations =
+      message.mutable_offer_operations();
+
+    foreachvalue (const OfferOperation* operation, offerOperations) {
+      operations->add_operations()->CopyFrom(*operation);
+    }
+
     sendUpdateSlaveMessage = true;
   }
 
@@ -1389,6 +1396,13 @@ void Slave::reregistered(
     message.mutable_resource_categories()->set_total(true);
     message.mutable_total_resources()->CopyFrom(totalResources);
 
+    UpdateSlaveMessage::OfferOperations* operations =
+      message.mutable_offer_operations();
+
+    foreachvalue (const OfferOperation* operation, offerOperations) {
+      operations->add_operations()->CopyFrom(*operation);
+    }
+
     sendUpdateSlaveMessage = true;
   }
 
@@ -6798,21 +6812,115 @@ void Slave::handleResourceProviderMessage(
           return resource.provider_id() == resourceProviderId;
         });
 
-      // Ignore the update if it contained no new information.
-      if (newTotal == oldTotal) {
-        break;
+      bool updated = false;
+
+      if (oldTotal != newTotal) {
+        totalResources -= oldTotal;
+        totalResources += newTotal;
+
+        updated = true;
+      }
+
+      // Update offer operation state.
+      //
+      // We only update offer operations which are not contained in both the
+      // known and just received sets. All other offer operations will be
+      // updated via relayed offer operation status updates.
+      auto isForResourceProvider = [resourceProviderId](
+                                      const OfferOperation& operation) {
+        Result<ResourceProviderID> id = getResourceProviderId(operation.info());
+        return id.isSome() && resourceProviderId == id.get();
+      };
+
+      hashmap<UUID, OfferOperation*> knownOfferOperations;
+      foreachpair(auto&& uuid, auto&& operation, offerOperations) {
+        if (isForResourceProvider(*operation)) {
+          knownOfferOperations.put(uuid, operation);
+        }
+      }
+
+      hashmap<UUID, OfferOperation> receivedOfferOperations;
+      foreach (
+          const OfferOperation& operation,
+          message->updateState->operations) {
+        CHECK(isForResourceProvider(operation))
+          << "Received operation on unexpected resource provider "
+          << "from resource provider " << resourceProviderId;
+
+        Try<UUID> operationUuid = UUID::fromBytes(operation.operation_uuid());
+        CHECK_SOME(operationUuid);
+
+        receivedOfferOperations.put(operationUuid.get(), operation);
       }
 
-      totalResources -= oldTotal;
-      totalResources += newTotal;
+      const hashset<UUID> knownUuids = knownOfferOperations.keys();
+      const hashset<UUID> receivedUuids = receivedOfferOperations.keys();
+
+      if (knownUuids != receivedUuids) {
+        // Handle offer operations known to the agent but not reported by the
+        // resource provider. These could be operations where the agent has
+        // started tracking an offer operation, but the resource provider failed
+        // over before it could bookkeep the operation.
+        //
+        // NOTE: We do not mutate offer operations statuses here; this
+        // would be the responsibility of a offer operation status
+        // update handler.
+        hashset<UUID> disappearedOperations;
+        std::set_difference(
+            knownUuids.begin(),
+            knownUuids.end(),
+            receivedUuids.begin(),
+            receivedUuids.end(),
+            std::inserter(
+                disappearedOperations, disappearedOperations.begin()));
+
+        foreach (const UUID& uuid, disappearedOperations) {
+          // TODO(bbannier): Instead of simply dropping an operation with
+          // `removeOfferOperation` here we should instead send a `Reconcile`
+          // message with a failed state to the resource provider so its status
+          // update manager can reliably deliver the operation status to the
+          // framework.
+          CHECK(offerOperations.contains(uuid));
+          removeOfferOperation(offerOperations.at(uuid));
+        }
+
+        // Handle offer operations known to the resource provider but
+        // not the agent. This can happen if the agent failed over and
+        // the resource provider reregistered.
+        hashset<UUID> reappearedOperations;
+        std::set_difference(
+            receivedUuids.begin(),
+            receivedUuids.end(),
+            knownUuids.begin(),
+            knownUuids.end(),
+            std::inserter(reappearedOperations, reappearedOperations.begin()));
+
+        foreach (const UUID& uuid, reappearedOperations) {
+          // Start tracking this offer operation.
+          //
+          // NOTE: We do not need to update total resources here as its
+          // state was sync explicitly with the received total above.
+          CHECK(receivedOfferOperations.contains(uuid));
+          addOfferOperation(
+              new OfferOperation(receivedOfferOperations.at(uuid)));
+        }
+
+        updated = true;
+      }
 
+      // Update resource version of this resource provider.
       const UUID& resourceVersionUuid =
         message->updateState->resourceVersionUuid;
 
-      if (resourceVersions.contains(resourceProviderId)) {
-        resourceVersions.at(resourceProviderId) = resourceVersionUuid;
-      } else {
-        resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+      if (!resourceVersions.contains(resourceProviderId) ||
+          resourceVersions.at(resourceProviderId) != resourceVersionUuid) {
+        if (resourceVersions.contains(resourceProviderId)) {
+          resourceVersions.at(resourceProviderId) = resourceVersionUuid;
+        } else {
+          resourceVersions.insert({resourceProviderId, resourceVersionUuid});
+        }
+
+        updated = true;
       }
 
       // Send the updated resources to the master if the agent is running. Note
@@ -6827,23 +6935,32 @@ void Slave::handleResourceProviderMessage(
           break;
         }
         case RUNNING: {
-          LOG(INFO) << "Forwarding new total resources " << totalResources;
+          if (updated) {
+            LOG(INFO) << "Forwarding new total resources " << totalResources;
 
-          // Inform the master that the total capacity of this agent has
-          // changed.
-          UpdateSlaveMessage updateSlaveMessage;
-          updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
-          updateSlaveMessage.mutable_resource_categories()->set_total(true);
+            // Inform the master that the total capacity of this agent has
+            // changed.
+            UpdateSlaveMessage updateSlaveMessage;
+            updateSlaveMessage.mutable_slave_id()->CopyFrom(info.id());
+            updateSlaveMessage.mutable_resource_categories()->set_total(true);
 
-          updateSlaveMessage.mutable_total_resources()->CopyFrom(
-              totalResources);
+            updateSlaveMessage.mutable_total_resources()->CopyFrom(
+                totalResources);
 
-          updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
-              protobuf::createResourceVersions(resourceVersions));
+            updateSlaveMessage.mutable_resource_version_uuids()->CopyFrom(
+                protobuf::createResourceVersions(resourceVersions));
 
-          send(master.get(), updateSlaveMessage);
+            UpdateSlaveMessage::OfferOperations* operations =
+              updateSlaveMessage.mutable_offer_operations();
 
-          break;
+            foreachvalue (const OfferOperation* operation, offerOperations) {
+              operations->add_operations()->CopyFrom(*operation);
+            }
+
+            send(master.get(), updateSlaveMessage);
+
+            break;
+          }
         }
       }
       break;


[6/8] mesos git commit: Introduced an allocator helper function to track used resources.

Posted by bb...@apache.org.
Introduced an allocator helper function to track used resources.

This patch introduces a helper to track allocated resources. It
encapsulates all needed updates to the various sorters for
reusability.

Review: https://reviews.apache.org/r/64136/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c0f8a40
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c0f8a40
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c0f8a40

Branch: refs/heads/master
Commit: 8c0f8a40b13fba93b881da02c4b33a7d1c9f01ee
Parents: 42c14e1
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Thu Nov 30 17:03:45 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Nov 30 18:33:58 2017 +0100

----------------------------------------------------------------------
 src/master/allocator/mesos/hierarchical.cpp | 93 +++++++++++-------------
 src/master/allocator/mesos/hierarchical.hpp |  5 ++
 2 files changed, 49 insertions(+), 49 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8c0f8a40/src/master/allocator/mesos/hierarchical.cpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.cpp b/src/master/allocator/mesos/hierarchical.cpp
index 5ce9cea..6b0be6a 100644
--- a/src/master/allocator/mesos/hierarchical.cpp
+++ b/src/master/allocator/mesos/hierarchical.cpp
@@ -287,20 +287,7 @@ void HierarchicalAllocatorProcess::addFramework(
       continue;
     }
 
-    hashmap<string, Resources> allocations = resources.allocations();
-
-    foreachpair (const string& role, const Resources& allocation, allocations) {
-      roleSorter->allocated(role, slaveId, allocation);
-      frameworkSorters.at(role)->add(slaveId, allocation);
-      frameworkSorters.at(role)->allocated(
-          frameworkId.value(), slaveId, allocation);
-
-      if (quotas.contains(role)) {
-        // See comment at `quotaRoleSorter` declaration
-        // regarding non-revocable.
-        quotaRoleSorter->allocated(role, slaveId, allocation.nonRevocable());
-      }
-    }
+    trackAllocatedResources(slaveId, {{frameworkId, resources}});
   }
 
   LOG(INFO) << "Added framework " << frameworkId;
@@ -523,41 +510,7 @@ void HierarchicalAllocatorProcess::addSlave(
   // See comment at `quotaRoleSorter` declaration regarding non-revocable.
   quotaRoleSorter->add(slaveId, total.nonRevocable());
 
-  // Update the allocation for each framework.
-  foreachpair (const FrameworkID& frameworkId,
-               const Resources& used_,
-               used) {
-    if (!frameworks.contains(frameworkId)) {
-      continue;
-    }
-
-    foreachpair (const string& role,
-                 const Resources& allocated,
-                 used_.allocations()) {
-      // The framework has resources allocated to this role but it may
-      // or may not be subscribed to the role. Either way, we need to
-      // track the framework under the role.
-      if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
-        trackFrameworkUnderRole(frameworkId, role);
-      }
-
-      // TODO(bmahler): Validate that the reserved resources have the
-      // framework's role.
-      CHECK(roleSorter->contains(role));
-      CHECK(frameworkSorters.contains(role));
-      CHECK(frameworkSorters.at(role)->contains(frameworkId.value()));
-
-      roleSorter->allocated(role, slaveId, allocated);
-      frameworkSorters.at(role)->add(slaveId, allocated);
-      frameworkSorters.at(role)->allocated(
-          frameworkId.value(), slaveId, allocated);
-
-      if (quotas.contains(role)) {
-        // See comment at `quotaRoleSorter` declaration regarding non-revocable.
-        quotaRoleSorter->allocated(role, slaveId, allocated.nonRevocable());
-      }
-    }
-  }
+  trackAllocatedResources(slaveId, used);
 
   slaves[slaveId] = Slave();
 
@@ -2419,6 +2372,48 @@ bool HierarchicalAllocatorProcess::isRemoteSlave(const Slave& slave) const
   return masterRegion != slaveRegion;
 }
 
+
+void HierarchicalAllocatorProcess::trackAllocatedResources(
+    const SlaveID& slaveId,
+    const hashmap<FrameworkID, Resources>& used)
+{
+  // Update the allocation for each framework.
+  foreachpair (const FrameworkID& frameworkId,
+               const Resources& used_,
+               used) {
+    if (!frameworks.contains(frameworkId)) {
+      continue;
+    }
+
+    foreachpair (const string& role,
+                 const Resources& allocated,
+                 used_.allocations()) {
+      // The framework has resources allocated to this role but it may
+      // or may not be subscribed to the role. Either way, we need to
+      // track the framework under the role.
+      if (!isFrameworkTrackedUnderRole(frameworkId, role)) {
+        trackFrameworkUnderRole(frameworkId, role);
+      }
+
+      // TODO(bmahler): Validate that the reserved resources have the
+      // framework's role.
+      CHECK(roleSorter->contains(role));
+      CHECK(frameworkSorters.contains(role));
+      CHECK(frameworkSorters.at(role)->contains(frameworkId.value()));
+
+      roleSorter->allocated(role, slaveId, allocated);
+      frameworkSorters.at(role)->add(slaveId, allocated);
+      frameworkSorters.at(role)->allocated(
+          frameworkId.value(), slaveId, allocated);
+
+      if (quotas.contains(role)) {
+        // See comment at `quotaRoleSorter` declaration regarding non-revocable.
+        quotaRoleSorter->allocated(role, slaveId, allocated.nonRevocable());
+      }
+    }
+  }
+}
+
 } // namespace internal {
 } // namespace allocator {
 } // namespace master {

http://git-wip-us.apache.org/repos/asf/mesos/blob/8c0f8a40/src/master/allocator/mesos/hierarchical.hpp
----------------------------------------------------------------------
diff --git a/src/master/allocator/mesos/hierarchical.hpp b/src/master/allocator/mesos/hierarchical.hpp
index 2c4832b..fd604c9 100644
--- a/src/master/allocator/mesos/hierarchical.hpp
+++ b/src/master/allocator/mesos/hierarchical.hpp
@@ -541,6 +541,11 @@ private:
   // different region than the master. This can only be the case if
   // the agent and the master are both configured with a fault domain.
   bool isRemoteSlave(const Slave& slave) const;
+
+  // Helper to track used resources on an agent.
+  void trackAllocatedResources(
+      const SlaveID& slaveId,
+      const hashmap<FrameworkID, Resources>& used);
 };