You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/03/14 12:44:53 UTC

[1/3] mesos git commit: Added comparison operators for operations.

Repository: mesos
Updated Branches:
  refs/heads/master 67cd0de36 -> 6fe66ce06


Added comparison operators for operations.

Review: https://reviews.apache.org/r/65589/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69de3738
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69de3738
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69de3738

Branch: refs/heads/master
Commit: 69de373827964fd7aad12c0a65ed52f59585a7bb
Parents: 67cd0de
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Wed Mar 14 13:04:05 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Mar 14 13:04:05 2018 +0100

----------------------------------------------------------------------
 include/mesos/type_utils.hpp |  2 ++
 include/mesos/v1/mesos.hpp   |  2 ++
 src/common/type_utils.cpp    | 12 ++++++++++++
 src/v1/mesos.cpp             | 12 ++++++++++++
 4 files changed, 28 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index df95319..19ea817 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -63,6 +63,7 @@ bool operator==(const Label& left, const Label& right);
 bool operator==(const Labels& left, const Labels& right);
 bool operator==(const MasterInfo& left, const MasterInfo& right);
 bool operator==(const Offer::Operation& left, const Offer::Operation& right);
+bool operator==(const Operation& left, const Operation& right);
 bool operator==(const OperationStatus& left, const OperationStatus& right);
 
 bool operator==(
@@ -86,6 +87,7 @@ bool operator!=(const CheckStatusInfo& left, const CheckStatusInfo& right);
 bool operator!=(const ExecutorInfo& left, const ExecutorInfo& right);
 bool operator!=(const Labels& left, const Labels& right);
 bool operator!=(const Offer::Operation& left, const Offer::Operation& right);
+bool operator!=(const Operation& left, const Operation& right);
 bool operator!=(const OperationStatus& left, const OperationStatus& right);
 
 bool operator!=(const TaskStatus& left, const TaskStatus& right);

http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index 15723a2..fda3eb4 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -61,6 +61,7 @@ bool operator==(const Label& left, const Label& right);
 bool operator==(const Labels& left, const Labels& right);
 bool operator==(const MasterInfo& left, const MasterInfo& right);
 bool operator==(const Offer::Operation& left, const Offer::Operation& right);
+bool operator==(const Operation& left, const Operation& right);
 
 bool operator==(
     const ResourceProviderInfo& left,
@@ -77,6 +78,7 @@ bool operator==(const Volume& left, const Volume& right);
 
 bool operator!=(const Labels& left, const Labels& right);
 bool operator!=(const Offer::Operation& left, const Offer::Operation& right);
+bool operator!=(const Operation& left, const Operation& right);
 bool operator!=(const TaskStatus& left, const TaskStatus& right);
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 3c2711b..33d6380 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -447,6 +447,12 @@ bool operator==(const Offer::Operation& left, const Offer::Operation& right)
 }
 
 
+bool operator==(const Operation& left, const Operation& right)
+{
+  return google::protobuf::util::MessageDifferencer::Equals(left, right);
+}
+
+
 bool operator==(const OperationStatus& left, const OperationStatus& right)
 {
   if (left.has_operation_id() != right.has_operation_id()) {
@@ -492,6 +498,12 @@ bool operator!=(const Offer::Operation& left, const Offer::Operation& right)
 }
 
 
+bool operator!=(const Operation& left, const Operation& right)
+{
+  return !(left == right);
+}
+
+
 bool operator!=(const OperationStatus& left, const OperationStatus& right)
 {
   return !(left == right);

http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index 1eb53f3..9b2df2d 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -403,12 +403,24 @@ bool operator==(const Offer::Operation& left, const Offer::Operation& right)
 }
 
 
+bool operator==(const Operation& left, const Operation& right)
+{
+  return google::protobuf::util::MessageDifferencer::Equals(left, right);
+}
+
+
 bool operator!=(const Offer::Operation& left, const Offer::Operation& right)
 {
   return !(left == right);
 }
 
 
+bool operator!=(const Operation& left, const Operation& right)
+{
+  return !(left == right);
+}
+
+
 bool operator==(
     const ResourceProviderInfo::Storage& left,
     const ResourceProviderInfo::Storage& right)


[3/3] mesos git commit: Explicitly tracked resource providers in master.

Posted by bb...@apache.org.
Explicitly tracked resource providers in master.

This patch adds explicit tracking of resource providers to the master
process. While we already had explicitly send resource provider
information in e.g., `UpdateSlaveMessage`, we only stored that
information aggregated over the full agent in the master up to now.

Review: https://reviews.apache.org/r/65591/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6fe66ce0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6fe66ce0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6fe66ce0

Branch: refs/heads/master
Commit: 6fe66ce0625afa8aa4ab9815a0bb881ccf066e31
Parents: 729cb5b
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Wed Mar 14 13:04:14 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Mar 14 13:04:14 2018 +0100

----------------------------------------------------------------------
 src/common/protobuf_utils.cpp |   9 +-
 src/master/http.cpp           |   8 +
 src/master/master.cpp         | 685 ++++++++++++++++++-------------------
 src/master/master.hpp         |  40 ++-
 4 files changed, 385 insertions(+), 357 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 9c5fb97..d2ada35 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -1272,13 +1272,12 @@ mesos::master::Response::GetAgents::Agent createAgentResponse(
       slave.capabilities.toRepeatedPtrField());
 
   foreachvalue (
-      const ResourceProviderInfo& resourceProviderInfo,
+      const mesos::internal::master::Slave::ResourceProvider& resourceProvider,
       slave.resourceProviders) {
-    mesos::master::Response::GetAgents::Agent::ResourceProvider*
-      resourceProvider = agent.add_resource_providers();
+    mesos::master::Response::GetAgents::Agent::ResourceProvider* provider =
+      agent.add_resource_providers();
 
-    resourceProvider->mutable_resource_provider_info()->CopyFrom(
-        resourceProviderInfo);
+    provider->mutable_resource_provider_info()->CopyFrom(resourceProvider.info);
   }
 
   return agent;

http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index cf03d8b..05fd7ce 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3907,6 +3907,14 @@ Future<Response> Master::Http::getOperations(
     foreachvalue (Operation* operation, slave->operations) {
       operations->add_operations()->CopyFrom(*operation);
     }
+
+    foreachvalue (
+        const Slave::ResourceProvider resourceProvider,
+        slave->resourceProviders) {
+      foreachvalue (Operation* operation, resourceProvider.operations) {
+        operations->add_operations()->CopyFrom(*operation);
+      }
+    }
   }
 
   return OK(serialize(contentType, evolve(response)), stringify(contentType));

http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 223ebf2..8df7ad5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4942,8 +4942,21 @@ void Master::_accept(
             RunTaskMessage message;
             message.mutable_framework()->MergeFrom(framework->info);
 
+            hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+            if (slave->resourceVersion.isSome()) {
+              resourceVersions.put(None(), slave->resourceVersion.get());
+            }
+
+            foreachpair (
+                const ResourceProviderID& resourceProviderId,
+                const Slave::ResourceProvider& resourceProvider,
+                slave->resourceProviders) {
+              resourceVersions.put(
+                  resourceProviderId, resourceProvider.resourceVersion);
+            }
+
             message.mutable_resource_version_uuids()->CopyFrom(
-                protobuf::createResourceVersions(slave->resourceVersions));
+                protobuf::createResourceVersions(resourceVersions));
 
             // TODO(anand): We set 'pid' to UPID() for http frameworks
             // as 'pid' was made optional in 0.24.0. In 0.25.0, we
@@ -5129,8 +5142,21 @@ void Master::_accept(
         message.mutable_executor()->CopyFrom(executor);
         message.mutable_task_group()->CopyFrom(taskGroup);
 
+        hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+        if (slave->resourceVersion.isSome()) {
+          resourceVersions.put(None(), slave->resourceVersion.get());
+        }
+
+        foreachpair (
+            const ResourceProviderID& resourceProviderId,
+            const Slave::ResourceProvider& resourceProvider,
+            slave->resourceProviders) {
+          resourceVersions.put(
+              resourceProviderId, resourceProvider.resourceVersion);
+        }
+
         message.mutable_resource_version_uuids()->CopyFrom(
-            protobuf::createResourceVersions(slave->resourceVersions));
+            protobuf::createResourceVersions(resourceVersions));
 
         set<TaskID> taskIds;
         Resources totalResources;
@@ -7292,178 +7318,93 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
 
   // TODO(bbannier): We only need to update if any changes from
   // resource providers are reported.
-  bool updated =
-    slave->totalResources != newSlaveResources ||
-    message.has_resource_providers();
-
-  if (message.has_resource_version_uuid()) {
-    hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-
-    resourceVersions.insert({None(), message.resource_version_uuid()});
-
-    foreach (
-        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
-        message.resource_providers().providers()) {
-      if (!resourceProvider.has_info()) {
-        continue;
-      }
-
-      CHECK(resourceProvider.info().has_id());
-
-      const ResourceProviderID& resourceProviderId =
-        resourceProvider.info().id();
-
-      CHECK(!resourceVersions.contains(resourceProviderId));
-      resourceVersions.insert(
-          {resourceProviderId, resourceProvider.resource_version_uuid()});
-    }
+  bool updated = slave->totalResources != newSlaveResources;
 
-    updated = updated || slave->resourceVersions != resourceVersions;
-    slave->resourceVersions = resourceVersions;
+  // Check if the agent's resource version changed.
+  if (!updated && message.has_resource_version_uuid() &&
+      (slave->resourceVersion.isNone() ||
+       (slave->resourceVersion.isSome() &&
+        message.resource_version_uuid() != slave->resourceVersion.get()))) {
+    updated = true;
   }
 
   // Check if the known operations for this agent changed.
-  const hashset<UUID> knownOperations = slave->operations.keys();
-  hashset<UUID> receivedOperations;
-
-  foreach (const Operation& operation, message.operations().operations()) {
-    receivedOperations.insert(operation.uuid());
-  }
+  if (!updated) {
+    foreach (const Operation& operation, message.operations().operations()) {
+      if (!slave->operations.contains(operation.uuid())) {
+        updated = true;
+        break;
+      }
 
-  if (message.has_resource_providers()) {
-    foreach (
-        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
-        message.resource_providers().providers()) {
-      foreach (const Operation& operation,
-               resourceProvider.operations().operations()) {
-        receivedOperations.insert(operation.uuid());
+      if (*slave->operations.at(operation.uuid()) != operation) {
+        updated = true;
+        break;
       }
     }
   }
 
-  updated = updated || knownOperations != receivedOperations;
-
-  if (!updated) {
-    LOG(INFO) << "Ignoring update on agent " << *slave
-              << " as it reports no changes";
-    return;
-  }
-
-  struct ResourceProvider
-  {
-    Option<Resources> oldTotal;
-    Option<Resources> newTotal;
-    Option<hashmap<UUID, Operation>> oldOperations;
-    Option<hashmap<UUID, Operation>> newOperations;
-    Option<ResourceProviderInfo> info;
-  };
-
-  // We store information on the different `ResourceProvider`s on this agent in
-  // a map, indexed by an optional provider id. Since the provider ID field for
-  // resources is only set for resources from true resource providers and is not
-  // set for agent default resources, the value for the key `None` points to
-  // information about the agent itself, not its resource providers.
-  hashmap<Option<ResourceProviderID>, ResourceProvider> resourceProviders;
-
-  // Group the resources and operation updates by resource provider.
-  {
-    // Process known resources.
-    auto groupResourcesByProviderId = [](const Resources& resources) {
-      hashmap<Option<ResourceProviderID>, Resources> result;
+  // Check if resource provider information changed.
+  if (!updated && message.has_resource_providers()) {
+    foreach (
+        const UpdateSlaveMessage::ResourceProvider& receivedProvider,
+        message.resource_providers().providers()) {
+      CHECK(receivedProvider.has_info());
+      CHECK(receivedProvider.info().has_id());
 
-      foreach (const Resource& resource, resources) {
-        Option<ResourceProviderID> providerId =
-          Resources::hasResourceProvider(resource)
-            ? resource.provider_id()
-            : Option<ResourceProviderID>::none();
+      const ResourceProviderID& resourceProviderId =
+        receivedProvider.info().id();
 
-        result[std::move(providerId)] += resource;
+      if (!slave->resourceProviders.contains(resourceProviderId)) {
+        updated = true;
+        break;
       }
 
-      return result;
-    };
+      const Slave::ResourceProvider& storedProvider =
+        slave->resourceProviders.at(resourceProviderId);
 
-    foreachpair (
-        const Option<ResourceProviderID>& providerId,
-        const Resources& resources,
-        groupResourcesByProviderId(slave->totalResources)) {
-      resourceProviders[providerId].oldTotal = resources;
-    }
-
-    // Process known operations.
-    foreachpair (const UUID& uuid,
-                 Operation* operation,
-                 slave->operations) {
-      Result<ResourceProviderID> providerId_ =
-        getResourceProviderId(operation->info());
-
-      CHECK(!providerId_.isError())
-        << "Failed to extract resource provider id from known operation: "
-        << providerId_.error();
-
-      Option<ResourceProviderID> providerId =
-        providerId_.isSome()
-          ? providerId_.get()
-          : Option<ResourceProviderID>::none();
-
-      // Set up an init empty list of existing operations. We might
-      // create a record for this resource provider if needed.
-      if (resourceProviders[providerId].oldOperations.isNone()) {
-        resourceProviders.at(providerId).oldOperations =
-          hashmap<UUID, Operation>();
+      if (storedProvider.info != receivedProvider.info() ||
+          storedProvider.totalResources != receivedProvider.total_resources() ||
+          storedProvider.resourceVersion !=
+            receivedProvider.resource_version_uuid()) {
+        updated = true;
+        break;
       }
 
-      resourceProviders.at(providerId)
-        .oldOperations->emplace(uuid, *operation);
-    }
-
-    // Explicitly add an entry for received agent resources.
-    resourceProviders[None()].newTotal =
-      newSlaveResources.filter(agentResources);
-
-    // Process received agent operations.
-    resourceProviders[None()].newOperations =
-      hashmap<UUID, Operation>();
-
-    foreach (const Operation& operation, message.operations().operations()) {
-      resourceProviders.at(None())
-        .newOperations->emplace(operation.uuid(), operation);
-    }
-
-    // Process explicitly received resource provider information.
-    if (message.has_resource_providers()) {
       foreach (
-          const UpdateSlaveMessage::ResourceProvider& resourceProvider,
-          message.resource_providers().providers()) {
-        CHECK(resourceProvider.has_info());
-        CHECK(resourceProvider.info().has_id());
-
-        ResourceProvider& provider =
-          resourceProviders[resourceProvider.info().id()];
-
-        provider.info = resourceProvider.info();
-
-        provider.newTotal = resourceProvider.total_resources();
-        if (provider.newOperations.isNone()) {
-          provider.newOperations = hashmap<UUID, Operation>();
+          const Operation& operation,
+          receivedProvider.operations().operations()) {
+        if (!storedProvider.operations.contains(operation.uuid())) {
+          updated = true;
+          break;
         }
 
-        foreach (const Operation& operation,
-                 resourceProvider.operations().operations()) {
-          provider.newOperations->emplace(operation.uuid(), operation);
+        if (*storedProvider.operations.at(operation.uuid()) != operation) {
+          updated = true;
+          break;
         }
       }
     }
   }
 
+  if (!updated) {
+    LOG(INFO) << "Ignoring update on agent " << *slave
+              << " as it reports no changes";
+    return;
+  }
+
   // Check invariants of the received update.
   {
-    foreachpair (
-        const Option<ResourceProviderID>& providerId,
-        const ResourceProvider& provider,
-        resourceProviders) {
-      if (providerId.isSome() &&
-          slave->resourceProviders.contains(providerId.get())) {
+    foreach (
+        const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+        message.resource_providers().providers()) {
+      CHECK(resourceProvider.has_info());
+      CHECK(resourceProvider.info().has_id());
+      const ResourceProviderID& providerId = resourceProvider.info().id();
+
+      const Option<Slave::ResourceProvider>& oldProvider =
+        slave->resourceProviders.get(providerId);
+
+      if (oldProvider.isSome()) {
         // For known resource providers the master should always know at least
         // as many non-terminal operations as the agent. While an
         // operation might get lost on the way to the agent or resource
@@ -7477,75 +7418,58 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
         // to the agent, but the agent fails over before it can process the
         // acknowledgement, or the agent initiates an unrelated
         // `UpdateSlaveMessage`.
-        auto extractPendingOperations =
-          [](const hashmap<UUID, Operation>& source,
-             hashset<UUID>* target) {
-            foreachpair (const UUID& uuid,
-                         const Operation& operation,
-                         source) {
-              if (!protobuf::isTerminalState(
-                      operation.latest_status().state())) {
-                target->insert(uuid);
-              }
-            }
-          };
+        foreach (
+            const Operation& operation,
+            resourceProvider.operations().operations()) {
+          if (!protobuf::isTerminalState(operation.latest_status().state())) {
+            CHECK(oldProvider->operations.contains(operation.uuid()))
+              << "Agent tried to reconcile unknown non-terminal operation "
+              << operation.uuid();
+          }
+        }
+      }
+    }
+  }
 
-        hashset<UUID> oldPendingOperations;
-        hashset<UUID> newPendingOperations;
+  // Update master and allocator state.
 
-        if (provider.oldOperations.isSome()) {
-          extractPendingOperations(
-              provider.oldOperations.get(), &oldPendingOperations);
-        }
+  if (hasOversubscribed) {
+    slave->totalResources -= slave->totalResources.revocable();
+    slave->totalResources += message.oversubscribed_resources();
 
-        if (provider.newOperations.isSome()) {
-          extractPendingOperations(
-              provider.newOperations.get(), &newPendingOperations);
-        }
+    // TODO(bbannier): Track oversubscribed resources for resource
+    // providers as well.
+  }
 
-        foreach (const UUID& uuid, newPendingOperations) {
-          CHECK(oldPendingOperations.contains(uuid))
-            << "Agent tried to reconcile unknown non-terminal operation "
-            << uuid;
-        }
-      }
+  ReconcileOperationsMessage reconcile;
 
-      if (providerId.isNone()) {
-        // We do not permit changes to agent (i.e., non-resource
-        // provider) non-revocable resources.
-        CHECK_SOME(provider.oldTotal);
-        CHECK_SOME(provider.newTotal);
-
-        Resources oldNonRevocable =
-          provider.oldTotal->nonRevocable().createStrippedScalarQuantity();
-        Resources newNonRevocable =
-          provider.newTotal->nonRevocable().createStrippedScalarQuantity();
-        CHECK_EQ(
-            provider.oldTotal->nonRevocable(),
-            provider.newTotal->nonRevocable());
-
-        // For agents only speculative operations can be reconciled.
-        //
-        // TODO(bbannier): Reconcile agent operations in
-        // `ReregisterSlaveMessage` in which case we expect agents to
-        // send the already known operations again here (possibly with
-        // changed status).
-        if (provider.newOperations.isSome()) {
-          foreachvalue (const Operation& operation,
-                        provider.newOperations.get()) {
-            CHECK(protobuf::isSpeculativeOperation(operation.info()));
-          }
-        }
-      }
+  // Reconcile operations on agent-default resources.
+  hashset<UUID> newOperations;
+  foreach (const Operation& operation, message.operations().operations()) {
+    newOperations.insert(operation.uuid());
+  }
+
+  foreachkey (const UUID& uuid, slave->operations) {
+    if (!message.has_operations() || !newOperations.contains(uuid)) {
+      LOG(WARNING) << "Performing explicit reconciliation with agent for"
+                   << " known operation " << uuid
+                   << " since it was not present in original"
+                   << " reconciliation message from agent";
+
+      ReconcileOperationsMessage::Operation* reconcileOperation =
+        reconcile.add_operations();
+
+      reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
     }
   }
 
-  ReconcileOperationsMessage reconcile;
+  foreach (
+      const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+      message.resource_providers().providers()) {
+    CHECK(resourceProvider.has_info());
+    CHECK(resourceProvider.info().has_id());
+    const ResourceProviderID& providerId = resourceProvider.info().id();
 
-  // Update master and allocator state.
-  foreachpair (const Option<ResourceProviderID>& providerId,
-               const ResourceProvider& provider,
-               resourceProviders) {
     // Below we only add operations to our state from resource
     // providers which are unknown, or possibly remove them for known
     // resource providers.  This works since the master should always
@@ -7565,142 +7489,140 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
     // TODO(bbannier): We might want to consider to also learn about
     // new (terminal) operations when observing messages from status
     // update managers to frameworks.
-
-    if (providerId.isSome() &&
-        !slave->resourceProviders.contains(providerId.get())) {
+    if (!slave->resourceProviders.contains(providerId)) {
       // If this is a not previously seen resource provider we had a master
       // failover. Add the resources and operations to our state.
-      CHECK_SOME(provider.newTotal);
       CHECK(
-          provider.newTotal->empty() ||
-          !slave->totalResources.contains(provider.newTotal.get()));
-
-      CHECK_SOME(provider.info);
-      slave->resourceProviders.insert({providerId.get(), provider.info.get()});
-
-      slave->totalResources += provider.newTotal.get();
+          resourceProvider.total_resources().empty() ||
+          !slave->totalResources.contains(resourceProvider.total_resources()));
+
+      // We add the resource provider to the master first so
+      // that it can be found when e.g., adding operations.
+      slave->resourceProviders.put(
+          providerId,
+          {resourceProvider.info(),
+           resourceProvider.total_resources(),
+           resourceProvider.resource_version_uuid(),
+           {}});
 
       hashmap<FrameworkID, Resources> usedByOperations;
 
-      if (provider.newOperations.isSome()) {
-        foreachpair (const UUID& uuid,
-                     const Operation& operation,
-                     provider.newOperations.get()) {
-          // Update to bookkeeping of operations.
-          CHECK(!slave->operations.contains(uuid))
-            << "New operation " << uuid << " is already known";
-
-          Framework* framework = nullptr;
-          if (operation.has_framework_id()) {
-            framework = getFramework(operation.framework_id());
-          }
+      foreach (
+          const Operation& operation,
+          resourceProvider.operations().operations()) {
+        // Update to bookkeeping of operations.
+        Framework* framework = nullptr;
+        if (operation.has_framework_id()) {
+          framework = getFramework(operation.framework_id());
+        }
 
-          addOperation(framework, slave, new Operation(operation));
+        addOperation(framework, slave, new Operation(operation));
 
-          if (!protobuf::isTerminalState(operation.latest_status().state())) {
-            // If we do not yet know the `FrameworkInfo` of the framework the
-            // operation originated from, we cannot properly track the operation
-            // at this point.
-            //
-            // TODO(bbannier): Consider introducing ways of making
-            // sure an agent always knows the `FrameworkInfo` of
-            // operations triggered on its resources, e.g., by adding
-            // an explicit `FrameworkInfo` to operations like is
-            // already done for `RunTaskMessage`, see MESOS-8582.
-            if (framework == nullptr) {
-              LOG(WARNING)
-                << "Cannot properly account for operation " << operation.uuid()
-                << " learnt in reconciliation of agent " << slaveId
-                << " since framework " << operation.framework_id()
-                << " is unknown; this can lead to assertion failures after the"
-                   " operation terminates, see MESOS-8536";
-              continue;
-            }
+        if (!protobuf::isTerminalState(operation.latest_status().state())) {
+          // If we do not yet know the `FrameworkInfo` of the framework the
+          // operation originated from, we cannot properly track the operation
+          // at this point.
+          //
+          // TODO(bbannier): Consider introducing ways of making sure an agent
+          // always knows the `FrameworkInfo` of operations triggered on its
+          // resources, e.g., by adding an explicit `FrameworkInfo` to
+          // operations like is already done for `RunTaskMessage`, see
+          // MESOS-8582.
+          if (framework == nullptr) {
+            LOG(WARNING)
+              << "Cannot properly account for operation " << operation.uuid()
+              << " learnt in reconciliation of agent " << slaveId
+              << " since framework " << operation.framework_id()
+              << " is unknown; this can lead to assertion failures after the"
+              " operation terminates, see MESOS-8536";
+            continue;
+          }
 
-            Try<Resources> consumedResources =
-              protobuf::getConsumedResources(operation.info());
+          Try<Resources> consumedResources =
+            protobuf::getConsumedResources(operation.info());
 
-            CHECK_SOME(consumedResources)
-              << "Could not determine resources consumed by operation "
-              << operation.uuid();
+          CHECK_SOME(consumedResources)
+            << "Could not determine resources consumed by operation "
+            << operation.uuid();
 
-            usedByOperations[operation.framework_id()] +=
-              consumedResources.get();
-          }
+          usedByOperations[operation.framework_id()] +=
+            consumedResources.get();
         }
       }
 
+      slave->totalResources += resourceProvider.total_resources();
+
       allocator->addResourceProvider(
-          slaveId,
-          provider.newTotal.get(),
-          usedByOperations);
+          slaveId, resourceProvider.total_resources(), usedByOperations);
     } else {
-      // If this is a known resource provider or agent its total
-      // capacity cannot have changed, and it would not know about any
-      // non-terminal operations not already known to the master.
-      // However, it might not have received an operation for a couple
-      // different reasons:
+      // If this is a known resource provider its total capacity cannot have
+      // changed, and it would not know about any non-terminal operations not
+      // already known to the master.  However, it might not have received an
+      // operation for a couple different reasons:
+      //
       //   - The resource provider or agent could have failed over
       //     before the operation's `ApplyOperationMessage` could be
       //     received.
       //   - The operation's `ApplyOperationMessage` could have raced
       //     with this `UpdateSlaveMessage`.
       //
-      // In both of these cases, we need to reconcile such operations
-      // explicitly with the agent. For operations which the agent or
-      // resource provider does not recognize, an OPERATION_DROPPED
-      // status update will be generated and the master will remove
-      // the operation from its state upon receipt of that update.
-      if (provider.oldOperations.isSome()) {
-        foreachkey (const UUID& uuid, provider.oldOperations.get()) {
-          if (provider.newOperations.isNone() ||
-              !provider.newOperations->contains(uuid)) {
-            LOG(WARNING) << "Performing explicit reconciliation with agent for"
-                         << " known operation " << uuid
-                         << " since it was not present in original"
-                         << " reconciliation message from agent";
-
-            ReconcileOperationsMessage::Operation* reconcileOperation =
-              reconcile.add_operations();
-            reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
-
-            if (providerId.isSome()) {
-              reconcileOperation->mutable_resource_provider_id()
-                ->CopyFrom(providerId.get());
-            }
-          }
+      // In both of these cases, we need to reconcile such operations explicitly
+      // with the agent. For operations which the agent or resource provider
+      // does not recognize, an OPERATION_DROPPED status update will be
+      // generated and the master will remove the operation from its state upon
+      // receipt of that update.
+      CHECK(slave->resourceProviders.contains(providerId));
+
+      Slave::ResourceProvider& oldProvider =
+        slave->resourceProviders.at(providerId);
+
+      hashmap<UUID, const Operation*> newOperations;
+      foreach (
+          const Operation& operation,
+          resourceProvider.operations().operations()) {
+        newOperations.put(operation.uuid(), &operation);
+      }
+
+      foreachpair (
+          const UUID& uuid, Operation* oldOperation, oldProvider.operations) {
+        if (!newOperations.contains(uuid)) {
+          LOG(WARNING) << "Performing explicit reconciliation with agent for"
+                       << " known operation " << uuid
+                       << " since it was not present in original"
+                       << " reconciliation message from agent";
 
+          ReconcileOperationsMessage::Operation* reconcileOperation =
+            reconcile.add_operations();
+
+          reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
+          reconcileOperation->mutable_resource_provider_id()->CopyFrom(
+              providerId);
+        } else {
           // If a known operation became terminal between any previous offer
           // operation status update and this `UpdateSlaveMessage`, the total
           // resources we were sent already had the operation applied. We need
           // to update the state of the operation to terminal here so that any
           // update sent by the agent later does not cause us to apply the
           // operation again.
-          if (provider.newOperations.isSome() &&
-              provider.newOperations->contains(uuid)) {
-            Option<Operation> oldOperation = provider.oldOperations->get(uuid);
-            Option<Operation> newOperation = provider.newOperations->get(uuid);
-
-            CHECK_SOME(oldOperation);
-            CHECK_SOME(newOperation);
-
-            if (!protobuf::isTerminalState(
-                    oldOperation->latest_status().state()) &&
-                protobuf::isTerminalState(
-                    newOperation->latest_status().state())) {
-              Operation* operation = CHECK_NOTNULL(slave->getOperation(uuid));
-
-              UpdateOperationStatusMessage update =
-                protobuf::createUpdateOperationStatusMessage(
-                    uuid,
-                    newOperation->latest_status(),
-                    newOperation->latest_status(),
-                    operation->framework_id(),
-                    operation->slave_id());
-
-              updateOperation(
-                  operation, update, false); // Do not update resources.
-            }
+
+          const Operation* newOperation = newOperations.at(uuid);
+
+          if (!protobuf::isTerminalState(
+                  oldOperation->latest_status().state()) &&
+              protobuf::isTerminalState(
+                  newOperation->latest_status().state())) {
+            Operation* operation = CHECK_NOTNULL(slave->getOperation(uuid));
+
+            UpdateOperationStatusMessage update =
+              protobuf::createUpdateOperationStatusMessage(
+                  uuid,
+                  newOperation->latest_status(),
+                  newOperation->latest_status(),
+                  operation->framework_id(),
+                  operation->slave_id());
+
+            updateOperation(
+                operation, update, false); // Do not update resources.
           }
         }
       }
@@ -7709,14 +7631,18 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
       // speculated operations which are only visible in the total,
       // but never in the used resources. We explicitly allow for
       // resource providers to change from or to zero capacity.
-      if (provider.oldTotal.isSome()) {
-        CHECK(slave->totalResources.contains(provider.oldTotal.get()));
-        slave->totalResources -= provider.oldTotal.get();
-      }
+      const Resources oldResources =
+        slave->totalResources.filter([&providerId](const Resource& resource) {
+          return resource.provider_id() == providerId;
+        });
 
-      if (provider.newTotal.isSome()) {
-        slave->totalResources += provider.newTotal.get();
-      }
+      slave->totalResources -= oldResources;
+      slave->totalResources += resourceProvider.total_resources();
+
+      oldProvider.totalResources = resourceProvider.total_resources();
+
+      // Reconcile resource versions.
+      oldProvider.resourceVersion = resourceProvider.resource_version_uuid();
     }
   }
 
@@ -7749,9 +7675,13 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
     //
     // TODO(bbannier): Only rescind offers possibly containing
     // affected resources.
-    if (message.has_resource_providers()) {
-      LOG(INFO) << "Removing offer " << offer->id() << " with resources "
-                << offered << " on agent " << *slave;
+    const Resources offeredResourceProviderResources = offered.filter(
+        [](const Resource& resource) { return resource.has_provider_id(); });
+    if (message.has_resource_providers() &&
+        !offeredResourceProviderResources.empty()) {
+      LOG(INFO)
+        << "Removing offer " << offer->id()
+        << " with resources " << offered << " on agent " << *slave;
 
       rescind = true;
     }
@@ -10604,16 +10534,17 @@ void Master::_apply(
     // This must have been validated by the caller.
     CHECK(!resourceProviderId.isError());
 
-    Option<UUID> resourceVersion = resourceProviderId.isSome()
-      ? slave->resourceVersions.get(resourceProviderId.get())
-      : slave->resourceVersions.get(None());
+    CHECK(
+        resourceProviderId.isNone() ||
+        slave->resourceProviders.contains(resourceProviderId.get()))
+      << "Resource provider " + stringify(resourceProviderId.get()) +
+           " is unknown";
+
+    CHECK_SOME(slave->resourceVersion);
 
-    CHECK_SOME(resourceVersion)
-      << "Resource version of "
-      << (resourceProviderId.isSome()
-           ? "resource provider " + stringify(resourceProviderId.get())
-           : "agent " + stringify(*slave))
-      << " is unknown";
+    const UUID resourceVersion = resourceProviderId.isNone()
+      ? slave->resourceVersion.get()
+      : slave->resourceProviders.get(resourceProviderId.get())->resourceVersion;
 
     Operation* operation = new Operation(
         protobuf::createOperation(
@@ -10651,7 +10582,7 @@ void Master::_apply(
     }
 
     message.mutable_resource_version_uuid()->mutable_uuid()->CopyFrom(
-        resourceVersion.get());
+        resourceVersion);
 
     LOG(INFO) << "Sending operation '" << operation->info().id()
               << "' (uuid: " << operation->uuid() << ") "
@@ -11427,7 +11358,7 @@ Slave::Slave(
     vector<SlaveInfo::Capability> _capabilites,
     const Time& _registeredTime,
     vector<Resource> _checkpointedResources,
-    const Option<UUID>& resourceVersion,
+    const Option<UUID>& _resourceVersion,
     vector<ExecutorInfo> executorInfos,
     vector<Task> tasks)
   : master(_master),
@@ -11441,6 +11372,7 @@ Slave::Slave(
     connected(true),
     active(true),
     checkpointedResources(std::move(_checkpointedResources)),
+    resourceVersion(_resourceVersion),
     observer(nullptr)
 {
   CHECK(info.has_id());
@@ -11453,10 +11385,6 @@ Slave::Slave(
   CHECK_SOME(resources);
   totalResources = resources.get();
 
-  if (resourceVersion.isSome()) {
-    resourceVersions.put(None(), resourceVersion.get());
-  }
-
   foreach (ExecutorInfo& executorInfo, executorInfos) {
     CHECK(executorInfo.has_framework_id());
     addExecutor(executorInfo.framework_id(), std::move(executorInfo));
@@ -11579,7 +11507,21 @@ void Slave::removeTask(Task* task)
 
 void Slave::addOperation(Operation* operation)
 {
-  operations.put(operation->uuid(), operation);
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError()) << resourceProviderId.error();
+
+  if (resourceProviderId.isNone()) {
+    operations.put(operation->uuid(), operation);
+  } else {
+    CHECK(resourceProviders.contains(resourceProviderId.get()));
+
+    ResourceProvider& resourceProvider =
+      resourceProviders.at(resourceProviderId.get());
+
+    resourceProvider.operations.put(operation->uuid(), operation);
+  }
 
   if (!protobuf::isSpeculativeOperation(operation->info()) &&
       !protobuf::isTerminalState(operation->latest_status().state())) {
@@ -11629,16 +11571,38 @@ void Slave::removeOperation(Operation* operation)
 {
   const UUID& uuid = operation->uuid();
 
-  CHECK(operations.contains(uuid))
-    << "Unknown operation (uuid: " << uuid << ")"
-    << " to agent " << *this;
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation->info());
+
+  CHECK(!resourceProviderId.isError()) << resourceProviderId.error();
 
+  // Recover the resource used by this operation.
   if (!protobuf::isSpeculativeOperation(operation->info()) &&
       !protobuf::isTerminalState(operation->latest_status().state())) {
     recoverResources(operation);
   }
 
-  operations.erase(uuid);
+  // Remove the operation.
+  if (resourceProviderId.isNone()) {
+    CHECK(operations.contains(uuid))
+      << "Unknown operation (uuid: " << uuid << ")"
+      << " to agent " << *this;
+
+    operations.erase(operation->uuid());
+  } else {
+    CHECK(resourceProviders.contains(resourceProviderId.get()))
+      << "resource provider " << resourceProviderId.get() << " is unknown";
+
+    ResourceProvider& resourceProvider =
+      resourceProviders.at(resourceProviderId.get());
+
+    CHECK(resourceProvider.operations.contains(uuid))
+      << "Unknown operation (uuid: " << uuid << ")"
+      << " to resource provider " << resourceProviderId.get()
+      << " on agent " << *this;
+
+    resourceProvider.operations.erase(operation->uuid());
+  }
 }
 
 
@@ -11647,6 +11611,13 @@ Operation* Slave::getOperation(const UUID& uuid) const
   if (operations.contains(uuid)) {
     return operations.at(uuid);
   }
+
+  foreachvalue (const ResourceProvider& resourceProvider, resourceProviders) {
+    if (resourceProvider.operations.contains(uuid)) {
+      return resourceProvider.operations.at(uuid);
+    }
+  }
+
   return nullptr;
 }
 
@@ -11740,6 +11711,24 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
   totalResources = resources.get();
 
   checkpointedResources = totalResources.filter(needCheckpointing);
+
+  // Also apply the conversion to the explicitly maintained resource
+  // provider resources.
+  foreach (const ResourceConversion& conversion, conversions) {
+    Result<ResourceProviderID> providerId = getResourceProviderId(conversion);
+
+    if (providerId.isNone()) {
+      continue;
+    }
+
+    CHECK_SOME(providerId);
+    CHECK(resourceProviders.contains(providerId.get()));
+    ResourceProvider& provider = resourceProviders.at(providerId.get());
+
+    CHECK(provider.totalResources.contains(conversion.consumed));
+    provider.totalResources -= conversion.consumed;
+    provider.totalResources += conversion.converted;
+  }
 }
 
 
@@ -11748,7 +11737,7 @@ Try<Nothing> Slave::update(
     const string& _version,
     const vector<SlaveInfo::Capability>& _capabilities,
     const Resources& _checkpointedResources,
-    const Option<UUID>& resourceVersion)
+    const Option<UUID>& _resourceVersion)
 {
   Try<Resources> resources = applyCheckpointedResources(
       _info.resources(),
@@ -11770,9 +11759,7 @@ Try<Nothing> Slave::update(
   // reregistering in this case.
   totalResources = resources.get();
 
-  if (resourceVersion.isSome()) {
-    resourceVersions.put(None(), resourceVersion.get());
-  }
+  resourceVersion = _resourceVersion;
 
   return Nothing();
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8bf2c76..896995f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -124,7 +124,7 @@ Slave(Master* const _master,
         std::vector<SlaveInfo::Capability> _capabilites,
         const process::Time& _registeredTime,
         std::vector<Resource> _checkpointedResources,
-        const Option<UUID>& resourceVersion,
+        const Option<UUID>& _resourceVersion,
         std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
         std::vector<Task> tasks = std::vector<Task>());
 
@@ -276,10 +276,44 @@ Slave(Master* const _master,
   // providers as well.
   Resources totalResources;
 
+  // Used to establish the relationship between the operation and the
+  // resources that the operation is operating on. Each resource
+  // provider will keep a resource version UUID, and change it when it
+  // believes that the resources from this resource provider are out
+  // of sync from the master's view.  The master will keep track of
+  // the last known resource version UUID for each resource provider,
+  // and attach the resource version UUID in each operation it sends
+  // out. The resource provider should reject operations that have a
+  // different resource version UUID than that it maintains, because
+  // this means the operation is operating on resources that might
+  // have already been invalidated.
+  Option<UUID> resourceVersion;
+
   SlaveObserver* observer;
 
-  hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-  hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviders;
+  struct ResourceProvider {
+    ResourceProviderInfo info;
+    Resources totalResources;
+
+    // Used to establish the relationship between the operation and the
+    // resources that the operation is operating on. Each resource
+    // provider will keep a resource version UUID, and change it when it
+    // believes that the resources from this resource provider are out
+    // of sync from the master's view.  The master will keep track of
+    // the last known resource version UUID for each resource provider,
+    // and attach the resource version UUID in each operation it sends
+    // out. The resource provider should reject operations that have a
+    // different resource version UUID than that it maintains, because
+    // this means the operation is operating on resources that might
+    // have already been invalidated.
+    UUID resourceVersion;
+
+    // Pending operations or terminal operations that have
+    // unacknowledged status updates.
+    hashmap<UUID, Operation*> operations;
+  };
+
+  hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
 
 private:
   Slave(const Slave&);              // No copying.


[2/3] mesos git commit: Added helper function to determine provider ID of a conversion.

Posted by bb...@apache.org.
Added helper function to determine provider ID of a conversion.

Review: https://reviews.apache.org/r/65590/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/729cb5b0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/729cb5b0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/729cb5b0

Branch: refs/heads/master
Commit: 729cb5b0b4fc03fc5a818a33a0be32c1e9940577
Parents: 69de373
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Wed Mar 14 13:04:10 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Mar 14 13:04:10 2018 +0100

----------------------------------------------------------------------
 src/common/resources_utils.cpp | 28 ++++++++++++++++++++++++++++
 src/common/resources_utils.hpp |  8 ++++++++
 2 files changed, 36 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/729cb5b0/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 99b16e0..9be01c1 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -285,6 +285,34 @@ Result<ResourceProviderID> getResourceProviderId(
 }
 
 
+Result<ResourceProviderID> getResourceProviderId(
+    const ResourceConversion& conversion)
+{
+  if (conversion.consumed.empty()) {
+    return Error("Could not determine resource provider");
+  }
+
+  const Resource& consumed = *conversion.consumed.begin();
+
+  const Option<ResourceProviderID> resourceProviderId =
+    consumed.has_provider_id()
+      ? consumed.provider_id()
+      : Option<ResourceProviderID>::none();
+
+
+  foreach (const Resource& resource, conversion.consumed) {
+    const Option<ResourceProviderID> resourceProviderId_ =
+      resource.has_provider_id()
+        ? resource.provider_id()
+        : Option<ResourceProviderID>::none();
+    if (resourceProviderId_ != resourceProviderId) {
+      return Error("Conversion works on multiple resource providers");
+    }
+  }
+
+  return resourceProviderId;
+}
+
 void convertResourceFormat(Resource* resource, ResourceFormat format)
 {
   switch (format) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/729cb5b0/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
index 73d070d..74cf747 100644
--- a/src/common/resources_utils.hpp
+++ b/src/common/resources_utils.hpp
@@ -58,6 +58,14 @@ Result<ResourceProviderID> getResourceProviderId(
     const Offer::Operation& operation);
 
 
+// Returns the ID of the resource provider affected by a resource
+// conversion. Returns None() if the conversion is on agent default
+// resources. We assume a single conversion only being applied on
+// resources from a single resource provider.
+Result<ResourceProviderID> getResourceProviderId(
+    const ResourceConversion& conversion);
+
+
 // Returns the resource conversions from the given offer operation.
 // This helper assumes that the given operation has already been
 // validated.