You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/03/14 12:44:53 UTC
[1/3] mesos git commit: Added comparison operators for operations.
Repository: mesos
Updated Branches:
refs/heads/master 67cd0de36 -> 6fe66ce06
Added comparison operators for operations.
Review: https://reviews.apache.org/r/65589/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/69de3738
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/69de3738
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/69de3738
Branch: refs/heads/master
Commit: 69de373827964fd7aad12c0a65ed52f59585a7bb
Parents: 67cd0de
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Wed Mar 14 13:04:05 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Mar 14 13:04:05 2018 +0100
----------------------------------------------------------------------
include/mesos/type_utils.hpp | 2 ++
include/mesos/v1/mesos.hpp | 2 ++
src/common/type_utils.cpp | 12 ++++++++++++
src/v1/mesos.cpp | 12 ++++++++++++
4 files changed, 28 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/include/mesos/type_utils.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/type_utils.hpp b/include/mesos/type_utils.hpp
index df95319..19ea817 100644
--- a/include/mesos/type_utils.hpp
+++ b/include/mesos/type_utils.hpp
@@ -63,6 +63,7 @@ bool operator==(const Label& left, const Label& right);
bool operator==(const Labels& left, const Labels& right);
bool operator==(const MasterInfo& left, const MasterInfo& right);
bool operator==(const Offer::Operation& left, const Offer::Operation& right);
+bool operator==(const Operation& left, const Operation& right);
bool operator==(const OperationStatus& left, const OperationStatus& right);
bool operator==(
@@ -86,6 +87,7 @@ bool operator!=(const CheckStatusInfo& left, const CheckStatusInfo& right);
bool operator!=(const ExecutorInfo& left, const ExecutorInfo& right);
bool operator!=(const Labels& left, const Labels& right);
bool operator!=(const Offer::Operation& left, const Offer::Operation& right);
+bool operator!=(const Operation& left, const Operation& right);
bool operator!=(const OperationStatus& left, const OperationStatus& right);
bool operator!=(const TaskStatus& left, const TaskStatus& right);
http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/include/mesos/v1/mesos.hpp
----------------------------------------------------------------------
diff --git a/include/mesos/v1/mesos.hpp b/include/mesos/v1/mesos.hpp
index 15723a2..fda3eb4 100644
--- a/include/mesos/v1/mesos.hpp
+++ b/include/mesos/v1/mesos.hpp
@@ -61,6 +61,7 @@ bool operator==(const Label& left, const Label& right);
bool operator==(const Labels& left, const Labels& right);
bool operator==(const MasterInfo& left, const MasterInfo& right);
bool operator==(const Offer::Operation& left, const Offer::Operation& right);
+bool operator==(const Operation& left, const Operation& right);
bool operator==(
const ResourceProviderInfo& left,
@@ -77,6 +78,7 @@ bool operator==(const Volume& left, const Volume& right);
bool operator!=(const Labels& left, const Labels& right);
bool operator!=(const Offer::Operation& left, const Offer::Operation& right);
+bool operator!=(const Operation& left, const Operation& right);
bool operator!=(const TaskStatus& left, const TaskStatus& right);
http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/src/common/type_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/type_utils.cpp b/src/common/type_utils.cpp
index 3c2711b..33d6380 100644
--- a/src/common/type_utils.cpp
+++ b/src/common/type_utils.cpp
@@ -447,6 +447,12 @@ bool operator==(const Offer::Operation& left, const Offer::Operation& right)
}
+bool operator==(const Operation& left, const Operation& right)
+{
+ return google::protobuf::util::MessageDifferencer::Equals(left, right);
+}
+
+
bool operator==(const OperationStatus& left, const OperationStatus& right)
{
if (left.has_operation_id() != right.has_operation_id()) {
@@ -492,6 +498,12 @@ bool operator!=(const Offer::Operation& left, const Offer::Operation& right)
}
+bool operator!=(const Operation& left, const Operation& right)
+{
+ return !(left == right);
+}
+
+
bool operator!=(const OperationStatus& left, const OperationStatus& right)
{
return !(left == right);
http://git-wip-us.apache.org/repos/asf/mesos/blob/69de3738/src/v1/mesos.cpp
----------------------------------------------------------------------
diff --git a/src/v1/mesos.cpp b/src/v1/mesos.cpp
index 1eb53f3..9b2df2d 100644
--- a/src/v1/mesos.cpp
+++ b/src/v1/mesos.cpp
@@ -403,12 +403,24 @@ bool operator==(const Offer::Operation& left, const Offer::Operation& right)
}
+bool operator==(const Operation& left, const Operation& right)
+{
+ return google::protobuf::util::MessageDifferencer::Equals(left, right);
+}
+
+
bool operator!=(const Offer::Operation& left, const Offer::Operation& right)
{
return !(left == right);
}
+bool operator!=(const Operation& left, const Operation& right)
+{
+ return !(left == right);
+}
+
+
bool operator==(
const ResourceProviderInfo::Storage& left,
const ResourceProviderInfo::Storage& right)
[3/3] mesos git commit: Explicitly tracked resource providers in
master.
Posted by bb...@apache.org.
Explicitly tracked resource providers in master.
This patch adds explicit tracking of resource providers to the master
process. While we already had explicitly send resource provider
information in e.g., `UpdateSlaveMessage`, we only stored that
information aggregated over the full agent in the master up to now.
Review: https://reviews.apache.org/r/65591/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6fe66ce0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6fe66ce0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6fe66ce0
Branch: refs/heads/master
Commit: 6fe66ce0625afa8aa4ab9815a0bb881ccf066e31
Parents: 729cb5b
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Wed Mar 14 13:04:14 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Mar 14 13:04:14 2018 +0100
----------------------------------------------------------------------
src/common/protobuf_utils.cpp | 9 +-
src/master/http.cpp | 8 +
src/master/master.cpp | 685 ++++++++++++++++++-------------------
src/master/master.hpp | 40 ++-
4 files changed, 385 insertions(+), 357 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/common/protobuf_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/protobuf_utils.cpp b/src/common/protobuf_utils.cpp
index 9c5fb97..d2ada35 100644
--- a/src/common/protobuf_utils.cpp
+++ b/src/common/protobuf_utils.cpp
@@ -1272,13 +1272,12 @@ mesos::master::Response::GetAgents::Agent createAgentResponse(
slave.capabilities.toRepeatedPtrField());
foreachvalue (
- const ResourceProviderInfo& resourceProviderInfo,
+ const mesos::internal::master::Slave::ResourceProvider& resourceProvider,
slave.resourceProviders) {
- mesos::master::Response::GetAgents::Agent::ResourceProvider*
- resourceProvider = agent.add_resource_providers();
+ mesos::master::Response::GetAgents::Agent::ResourceProvider* provider =
+ agent.add_resource_providers();
- resourceProvider->mutable_resource_provider_info()->CopyFrom(
- resourceProviderInfo);
+ provider->mutable_resource_provider_info()->CopyFrom(resourceProvider.info);
}
return agent;
http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/http.cpp
----------------------------------------------------------------------
diff --git a/src/master/http.cpp b/src/master/http.cpp
index cf03d8b..05fd7ce 100644
--- a/src/master/http.cpp
+++ b/src/master/http.cpp
@@ -3907,6 +3907,14 @@ Future<Response> Master::Http::getOperations(
foreachvalue (Operation* operation, slave->operations) {
operations->add_operations()->CopyFrom(*operation);
}
+
+ foreachvalue (
+ const Slave::ResourceProvider resourceProvider,
+ slave->resourceProviders) {
+ foreachvalue (Operation* operation, resourceProvider.operations) {
+ operations->add_operations()->CopyFrom(*operation);
+ }
+ }
}
return OK(serialize(contentType, evolve(response)), stringify(contentType));
http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 223ebf2..8df7ad5 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4942,8 +4942,21 @@ void Master::_accept(
RunTaskMessage message;
message.mutable_framework()->MergeFrom(framework->info);
+ hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+ if (slave->resourceVersion.isSome()) {
+ resourceVersions.put(None(), slave->resourceVersion.get());
+ }
+
+ foreachpair (
+ const ResourceProviderID& resourceProviderId,
+ const Slave::ResourceProvider& resourceProvider,
+ slave->resourceProviders) {
+ resourceVersions.put(
+ resourceProviderId, resourceProvider.resourceVersion);
+ }
+
message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(slave->resourceVersions));
+ protobuf::createResourceVersions(resourceVersions));
// TODO(anand): We set 'pid' to UPID() for http frameworks
// as 'pid' was made optional in 0.24.0. In 0.25.0, we
@@ -5129,8 +5142,21 @@ void Master::_accept(
message.mutable_executor()->CopyFrom(executor);
message.mutable_task_group()->CopyFrom(taskGroup);
+ hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
+ if (slave->resourceVersion.isSome()) {
+ resourceVersions.put(None(), slave->resourceVersion.get());
+ }
+
+ foreachpair (
+ const ResourceProviderID& resourceProviderId,
+ const Slave::ResourceProvider& resourceProvider,
+ slave->resourceProviders) {
+ resourceVersions.put(
+ resourceProviderId, resourceProvider.resourceVersion);
+ }
+
message.mutable_resource_version_uuids()->CopyFrom(
- protobuf::createResourceVersions(slave->resourceVersions));
+ protobuf::createResourceVersions(resourceVersions));
set<TaskID> taskIds;
Resources totalResources;
@@ -7292,178 +7318,93 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
// TODO(bbannier): We only need to update if any changes from
// resource providers are reported.
- bool updated =
- slave->totalResources != newSlaveResources ||
- message.has_resource_providers();
-
- if (message.has_resource_version_uuid()) {
- hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
-
- resourceVersions.insert({None(), message.resource_version_uuid()});
-
- foreach (
- const UpdateSlaveMessage::ResourceProvider& resourceProvider,
- message.resource_providers().providers()) {
- if (!resourceProvider.has_info()) {
- continue;
- }
-
- CHECK(resourceProvider.info().has_id());
-
- const ResourceProviderID& resourceProviderId =
- resourceProvider.info().id();
-
- CHECK(!resourceVersions.contains(resourceProviderId));
- resourceVersions.insert(
- {resourceProviderId, resourceProvider.resource_version_uuid()});
- }
+ bool updated = slave->totalResources != newSlaveResources;
- updated = updated || slave->resourceVersions != resourceVersions;
- slave->resourceVersions = resourceVersions;
+ // Check if the agent's resource version changed.
+ if (!updated && message.has_resource_version_uuid() &&
+ (slave->resourceVersion.isNone() ||
+ (slave->resourceVersion.isSome() &&
+ message.resource_version_uuid() != slave->resourceVersion.get()))) {
+ updated = true;
}
// Check if the known operations for this agent changed.
- const hashset<UUID> knownOperations = slave->operations.keys();
- hashset<UUID> receivedOperations;
-
- foreach (const Operation& operation, message.operations().operations()) {
- receivedOperations.insert(operation.uuid());
- }
+ if (!updated) {
+ foreach (const Operation& operation, message.operations().operations()) {
+ if (!slave->operations.contains(operation.uuid())) {
+ updated = true;
+ break;
+ }
- if (message.has_resource_providers()) {
- foreach (
- const UpdateSlaveMessage::ResourceProvider& resourceProvider,
- message.resource_providers().providers()) {
- foreach (const Operation& operation,
- resourceProvider.operations().operations()) {
- receivedOperations.insert(operation.uuid());
+ if (*slave->operations.at(operation.uuid()) != operation) {
+ updated = true;
+ break;
}
}
}
- updated = updated || knownOperations != receivedOperations;
-
- if (!updated) {
- LOG(INFO) << "Ignoring update on agent " << *slave
- << " as it reports no changes";
- return;
- }
-
- struct ResourceProvider
- {
- Option<Resources> oldTotal;
- Option<Resources> newTotal;
- Option<hashmap<UUID, Operation>> oldOperations;
- Option<hashmap<UUID, Operation>> newOperations;
- Option<ResourceProviderInfo> info;
- };
-
- // We store information on the different `ResourceProvider`s on this agent in
- // a map, indexed by an optional provider id. Since the provider ID field for
- // resources is only set for resources from true resource providers and is not
- // set for agent default resources, the value for the key `None` points to
- // information about the agent itself, not its resource providers.
- hashmap<Option<ResourceProviderID>, ResourceProvider> resourceProviders;
-
- // Group the resources and operation updates by resource provider.
- {
- // Process known resources.
- auto groupResourcesByProviderId = [](const Resources& resources) {
- hashmap<Option<ResourceProviderID>, Resources> result;
+ // Check if resource provider information changed.
+ if (!updated && message.has_resource_providers()) {
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& receivedProvider,
+ message.resource_providers().providers()) {
+ CHECK(receivedProvider.has_info());
+ CHECK(receivedProvider.info().has_id());
- foreach (const Resource& resource, resources) {
- Option<ResourceProviderID> providerId =
- Resources::hasResourceProvider(resource)
- ? resource.provider_id()
- : Option<ResourceProviderID>::none();
+ const ResourceProviderID& resourceProviderId =
+ receivedProvider.info().id();
- result[std::move(providerId)] += resource;
+ if (!slave->resourceProviders.contains(resourceProviderId)) {
+ updated = true;
+ break;
}
- return result;
- };
+ const Slave::ResourceProvider& storedProvider =
+ slave->resourceProviders.at(resourceProviderId);
- foreachpair (
- const Option<ResourceProviderID>& providerId,
- const Resources& resources,
- groupResourcesByProviderId(slave->totalResources)) {
- resourceProviders[providerId].oldTotal = resources;
- }
-
- // Process known operations.
- foreachpair (const UUID& uuid,
- Operation* operation,
- slave->operations) {
- Result<ResourceProviderID> providerId_ =
- getResourceProviderId(operation->info());
-
- CHECK(!providerId_.isError())
- << "Failed to extract resource provider id from known operation: "
- << providerId_.error();
-
- Option<ResourceProviderID> providerId =
- providerId_.isSome()
- ? providerId_.get()
- : Option<ResourceProviderID>::none();
-
- // Set up an init empty list of existing operations. We might
- // create a record for this resource provider if needed.
- if (resourceProviders[providerId].oldOperations.isNone()) {
- resourceProviders.at(providerId).oldOperations =
- hashmap<UUID, Operation>();
+ if (storedProvider.info != receivedProvider.info() ||
+ storedProvider.totalResources != receivedProvider.total_resources() ||
+ storedProvider.resourceVersion !=
+ receivedProvider.resource_version_uuid()) {
+ updated = true;
+ break;
}
- resourceProviders.at(providerId)
- .oldOperations->emplace(uuid, *operation);
- }
-
- // Explicitly add an entry for received agent resources.
- resourceProviders[None()].newTotal =
- newSlaveResources.filter(agentResources);
-
- // Process received agent operations.
- resourceProviders[None()].newOperations =
- hashmap<UUID, Operation>();
-
- foreach (const Operation& operation, message.operations().operations()) {
- resourceProviders.at(None())
- .newOperations->emplace(operation.uuid(), operation);
- }
-
- // Process explicitly received resource provider information.
- if (message.has_resource_providers()) {
foreach (
- const UpdateSlaveMessage::ResourceProvider& resourceProvider,
- message.resource_providers().providers()) {
- CHECK(resourceProvider.has_info());
- CHECK(resourceProvider.info().has_id());
-
- ResourceProvider& provider =
- resourceProviders[resourceProvider.info().id()];
-
- provider.info = resourceProvider.info();
-
- provider.newTotal = resourceProvider.total_resources();
- if (provider.newOperations.isNone()) {
- provider.newOperations = hashmap<UUID, Operation>();
+ const Operation& operation,
+ receivedProvider.operations().operations()) {
+ if (!storedProvider.operations.contains(operation.uuid())) {
+ updated = true;
+ break;
}
- foreach (const Operation& operation,
- resourceProvider.operations().operations()) {
- provider.newOperations->emplace(operation.uuid(), operation);
+ if (*storedProvider.operations.at(operation.uuid()) != operation) {
+ updated = true;
+ break;
}
}
}
}
+ if (!updated) {
+ LOG(INFO) << "Ignoring update on agent " << *slave
+ << " as it reports no changes";
+ return;
+ }
+
// Check invariants of the received update.
{
- foreachpair (
- const Option<ResourceProviderID>& providerId,
- const ResourceProvider& provider,
- resourceProviders) {
- if (providerId.isSome() &&
- slave->resourceProviders.contains(providerId.get())) {
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ CHECK(resourceProvider.has_info());
+ CHECK(resourceProvider.info().has_id());
+ const ResourceProviderID& providerId = resourceProvider.info().id();
+
+ const Option<Slave::ResourceProvider>& oldProvider =
+ slave->resourceProviders.get(providerId);
+
+ if (oldProvider.isSome()) {
// For known resource providers the master should always know at least
// as many non-terminal operations as the agent. While an
// operation might get lost on the way to the agent or resource
@@ -7477,75 +7418,58 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
// to the agent, but the agent fails over before it can process the
// acknowledgement, or the agent initiates an unrelated
// `UpdateSlaveMessage`.
- auto extractPendingOperations =
- [](const hashmap<UUID, Operation>& source,
- hashset<UUID>* target) {
- foreachpair (const UUID& uuid,
- const Operation& operation,
- source) {
- if (!protobuf::isTerminalState(
- operation.latest_status().state())) {
- target->insert(uuid);
- }
- }
- };
+ foreach (
+ const Operation& operation,
+ resourceProvider.operations().operations()) {
+ if (!protobuf::isTerminalState(operation.latest_status().state())) {
+ CHECK(oldProvider->operations.contains(operation.uuid()))
+ << "Agent tried to reconcile unknown non-terminal operation "
+ << operation.uuid();
+ }
+ }
+ }
+ }
+ }
- hashset<UUID> oldPendingOperations;
- hashset<UUID> newPendingOperations;
+ // Update master and allocator state.
- if (provider.oldOperations.isSome()) {
- extractPendingOperations(
- provider.oldOperations.get(), &oldPendingOperations);
- }
+ if (hasOversubscribed) {
+ slave->totalResources -= slave->totalResources.revocable();
+ slave->totalResources += message.oversubscribed_resources();
- if (provider.newOperations.isSome()) {
- extractPendingOperations(
- provider.newOperations.get(), &newPendingOperations);
- }
+ // TODO(bbannier): Track oversubscribed resources for resource
+ // providers as well.
+ }
- foreach (const UUID& uuid, newPendingOperations) {
- CHECK(oldPendingOperations.contains(uuid))
- << "Agent tried to reconcile unknown non-terminal operation "
- << uuid;
- }
- }
+ ReconcileOperationsMessage reconcile;
- if (providerId.isNone()) {
- // We do not permit changes to agent (i.e., non-resource
- // provider) non-revocable resources.
- CHECK_SOME(provider.oldTotal);
- CHECK_SOME(provider.newTotal);
-
- Resources oldNonRevocable =
- provider.oldTotal->nonRevocable().createStrippedScalarQuantity();
- Resources newNonRevocable =
- provider.newTotal->nonRevocable().createStrippedScalarQuantity();
- CHECK_EQ(
- provider.oldTotal->nonRevocable(),
- provider.newTotal->nonRevocable());
-
- // For agents only speculative operations can be reconciled.
- //
- // TODO(bbannier): Reconcile agent operations in
- // `ReregisterSlaveMessage` in which case we expect agents to
- // send the already known operations again here (possibly with
- // changed status).
- if (provider.newOperations.isSome()) {
- foreachvalue (const Operation& operation,
- provider.newOperations.get()) {
- CHECK(protobuf::isSpeculativeOperation(operation.info()));
- }
- }
- }
+ // Reconcile operations on agent-default resources.
+ hashset<UUID> newOperations;
+ foreach (const Operation& operation, message.operations().operations()) {
+ newOperations.insert(operation.uuid());
+ }
+
+ foreachkey (const UUID& uuid, slave->operations) {
+ if (!message.has_operations() || !newOperations.contains(uuid)) {
+ LOG(WARNING) << "Performing explicit reconciliation with agent for"
+ << " known operation " << uuid
+ << " since it was not present in original"
+ << " reconciliation message from agent";
+
+ ReconcileOperationsMessage::Operation* reconcileOperation =
+ reconcile.add_operations();
+
+ reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
}
}
- ReconcileOperationsMessage reconcile;
+ foreach (
+ const UpdateSlaveMessage::ResourceProvider& resourceProvider,
+ message.resource_providers().providers()) {
+ CHECK(resourceProvider.has_info());
+ CHECK(resourceProvider.info().has_id());
+ const ResourceProviderID& providerId = resourceProvider.info().id();
- // Update master and allocator state.
- foreachpair (const Option<ResourceProviderID>& providerId,
- const ResourceProvider& provider,
- resourceProviders) {
// Below we only add operations to our state from resource
// providers which are unknown, or possibly remove them for known
// resource providers. This works since the master should always
@@ -7565,142 +7489,140 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
// TODO(bbannier): We might want to consider to also learn about
// new (terminal) operations when observing messages from status
// update managers to frameworks.
-
- if (providerId.isSome() &&
- !slave->resourceProviders.contains(providerId.get())) {
+ if (!slave->resourceProviders.contains(providerId)) {
// If this is a not previously seen resource provider we had a master
// failover. Add the resources and operations to our state.
- CHECK_SOME(provider.newTotal);
CHECK(
- provider.newTotal->empty() ||
- !slave->totalResources.contains(provider.newTotal.get()));
-
- CHECK_SOME(provider.info);
- slave->resourceProviders.insert({providerId.get(), provider.info.get()});
-
- slave->totalResources += provider.newTotal.get();
+ resourceProvider.total_resources().empty() ||
+ !slave->totalResources.contains(resourceProvider.total_resources()));
+
+ // We add the resource provider to the master first so
+ // that it can be found when e.g., adding operations.
+ slave->resourceProviders.put(
+ providerId,
+ {resourceProvider.info(),
+ resourceProvider.total_resources(),
+ resourceProvider.resource_version_uuid(),
+ {}});
hashmap<FrameworkID, Resources> usedByOperations;
- if (provider.newOperations.isSome()) {
- foreachpair (const UUID& uuid,
- const Operation& operation,
- provider.newOperations.get()) {
- // Update to bookkeeping of operations.
- CHECK(!slave->operations.contains(uuid))
- << "New operation " << uuid << " is already known";
-
- Framework* framework = nullptr;
- if (operation.has_framework_id()) {
- framework = getFramework(operation.framework_id());
- }
+ foreach (
+ const Operation& operation,
+ resourceProvider.operations().operations()) {
+ // Update to bookkeeping of operations.
+ Framework* framework = nullptr;
+ if (operation.has_framework_id()) {
+ framework = getFramework(operation.framework_id());
+ }
- addOperation(framework, slave, new Operation(operation));
+ addOperation(framework, slave, new Operation(operation));
- if (!protobuf::isTerminalState(operation.latest_status().state())) {
- // If we do not yet know the `FrameworkInfo` of the framework the
- // operation originated from, we cannot properly track the operation
- // at this point.
- //
- // TODO(bbannier): Consider introducing ways of making
- // sure an agent always knows the `FrameworkInfo` of
- // operations triggered on its resources, e.g., by adding
- // an explicit `FrameworkInfo` to operations like is
- // already done for `RunTaskMessage`, see MESOS-8582.
- if (framework == nullptr) {
- LOG(WARNING)
- << "Cannot properly account for operation " << operation.uuid()
- << " learnt in reconciliation of agent " << slaveId
- << " since framework " << operation.framework_id()
- << " is unknown; this can lead to assertion failures after the"
- " operation terminates, see MESOS-8536";
- continue;
- }
+ if (!protobuf::isTerminalState(operation.latest_status().state())) {
+ // If we do not yet know the `FrameworkInfo` of the framework the
+ // operation originated from, we cannot properly track the operation
+ // at this point.
+ //
+ // TODO(bbannier): Consider introducing ways of making sure an agent
+ // always knows the `FrameworkInfo` of operations triggered on its
+ // resources, e.g., by adding an explicit `FrameworkInfo` to
+ // operations like is already done for `RunTaskMessage`, see
+ // MESOS-8582.
+ if (framework == nullptr) {
+ LOG(WARNING)
+ << "Cannot properly account for operation " << operation.uuid()
+ << " learnt in reconciliation of agent " << slaveId
+ << " since framework " << operation.framework_id()
+ << " is unknown; this can lead to assertion failures after the"
+ " operation terminates, see MESOS-8536";
+ continue;
+ }
- Try<Resources> consumedResources =
- protobuf::getConsumedResources(operation.info());
+ Try<Resources> consumedResources =
+ protobuf::getConsumedResources(operation.info());
- CHECK_SOME(consumedResources)
- << "Could not determine resources consumed by operation "
- << operation.uuid();
+ CHECK_SOME(consumedResources)
+ << "Could not determine resources consumed by operation "
+ << operation.uuid();
- usedByOperations[operation.framework_id()] +=
- consumedResources.get();
- }
+ usedByOperations[operation.framework_id()] +=
+ consumedResources.get();
}
}
+ slave->totalResources += resourceProvider.total_resources();
+
allocator->addResourceProvider(
- slaveId,
- provider.newTotal.get(),
- usedByOperations);
+ slaveId, resourceProvider.total_resources(), usedByOperations);
} else {
- // If this is a known resource provider or agent its total
- // capacity cannot have changed, and it would not know about any
- // non-terminal operations not already known to the master.
- // However, it might not have received an operation for a couple
- // different reasons:
+ // If this is a known resource provider its total capacity cannot have
+ // changed, and it would not know about any non-terminal operations not
+ // already known to the master. However, it might not have received an
+ // operation for a couple different reasons:
+ //
// - The resource provider or agent could have failed over
// before the operation's `ApplyOperationMessage` could be
// received.
// - The operation's `ApplyOperationMessage` could have raced
// with this `UpdateSlaveMessage`.
//
- // In both of these cases, we need to reconcile such operations
- // explicitly with the agent. For operations which the agent or
- // resource provider does not recognize, an OPERATION_DROPPED
- // status update will be generated and the master will remove
- // the operation from its state upon receipt of that update.
- if (provider.oldOperations.isSome()) {
- foreachkey (const UUID& uuid, provider.oldOperations.get()) {
- if (provider.newOperations.isNone() ||
- !provider.newOperations->contains(uuid)) {
- LOG(WARNING) << "Performing explicit reconciliation with agent for"
- << " known operation " << uuid
- << " since it was not present in original"
- << " reconciliation message from agent";
-
- ReconcileOperationsMessage::Operation* reconcileOperation =
- reconcile.add_operations();
- reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
-
- if (providerId.isSome()) {
- reconcileOperation->mutable_resource_provider_id()
- ->CopyFrom(providerId.get());
- }
- }
+ // In both of these cases, we need to reconcile such operations explicitly
+ // with the agent. For operations which the agent or resource provider
+ // does not recognize, an OPERATION_DROPPED status update will be
+ // generated and the master will remove the operation from its state upon
+ // receipt of that update.
+ CHECK(slave->resourceProviders.contains(providerId));
+
+ Slave::ResourceProvider& oldProvider =
+ slave->resourceProviders.at(providerId);
+
+ hashmap<UUID, const Operation*> newOperations;
+ foreach (
+ const Operation& operation,
+ resourceProvider.operations().operations()) {
+ newOperations.put(operation.uuid(), &operation);
+ }
+
+ foreachpair (
+ const UUID& uuid, Operation* oldOperation, oldProvider.operations) {
+ if (!newOperations.contains(uuid)) {
+ LOG(WARNING) << "Performing explicit reconciliation with agent for"
+ << " known operation " << uuid
+ << " since it was not present in original"
+ << " reconciliation message from agent";
+ ReconcileOperationsMessage::Operation* reconcileOperation =
+ reconcile.add_operations();
+
+ reconcileOperation->mutable_operation_uuid()->CopyFrom(uuid);
+ reconcileOperation->mutable_resource_provider_id()->CopyFrom(
+ providerId);
+ } else {
// If a known operation became terminal between any previous offer
// operation status update and this `UpdateSlaveMessage`, the total
// resources we were sent already had the operation applied. We need
// to update the state of the operation to terminal here so that any
// update sent by the agent later does not cause us to apply the
// operation again.
- if (provider.newOperations.isSome() &&
- provider.newOperations->contains(uuid)) {
- Option<Operation> oldOperation = provider.oldOperations->get(uuid);
- Option<Operation> newOperation = provider.newOperations->get(uuid);
-
- CHECK_SOME(oldOperation);
- CHECK_SOME(newOperation);
-
- if (!protobuf::isTerminalState(
- oldOperation->latest_status().state()) &&
- protobuf::isTerminalState(
- newOperation->latest_status().state())) {
- Operation* operation = CHECK_NOTNULL(slave->getOperation(uuid));
-
- UpdateOperationStatusMessage update =
- protobuf::createUpdateOperationStatusMessage(
- uuid,
- newOperation->latest_status(),
- newOperation->latest_status(),
- operation->framework_id(),
- operation->slave_id());
-
- updateOperation(
- operation, update, false); // Do not update resources.
- }
+
+ const Operation* newOperation = newOperations.at(uuid);
+
+ if (!protobuf::isTerminalState(
+ oldOperation->latest_status().state()) &&
+ protobuf::isTerminalState(
+ newOperation->latest_status().state())) {
+ Operation* operation = CHECK_NOTNULL(slave->getOperation(uuid));
+
+ UpdateOperationStatusMessage update =
+ protobuf::createUpdateOperationStatusMessage(
+ uuid,
+ newOperation->latest_status(),
+ newOperation->latest_status(),
+ operation->framework_id(),
+ operation->slave_id());
+
+ updateOperation(
+ operation, update, false); // Do not update resources.
}
}
}
@@ -7709,14 +7631,18 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
// speculated operations which are only visible in the total,
// but never in the used resources. We explicitly allow for
// resource providers to change from or to zero capacity.
- if (provider.oldTotal.isSome()) {
- CHECK(slave->totalResources.contains(provider.oldTotal.get()));
- slave->totalResources -= provider.oldTotal.get();
- }
+ const Resources oldResources =
+ slave->totalResources.filter([&providerId](const Resource& resource) {
+ return resource.provider_id() == providerId;
+ });
- if (provider.newTotal.isSome()) {
- slave->totalResources += provider.newTotal.get();
- }
+ slave->totalResources -= oldResources;
+ slave->totalResources += resourceProvider.total_resources();
+
+ oldProvider.totalResources = resourceProvider.total_resources();
+
+ // Reconcile resource versions.
+ oldProvider.resourceVersion = resourceProvider.resource_version_uuid();
}
}
@@ -7749,9 +7675,13 @@ void Master::updateSlave(UpdateSlaveMessage&& message)
//
// TODO(bbannier): Only rescind offers possibly containing
// affected resources.
- if (message.has_resource_providers()) {
- LOG(INFO) << "Removing offer " << offer->id() << " with resources "
- << offered << " on agent " << *slave;
+ const Resources offeredResourceProviderResources = offered.filter(
+ [](const Resource& resource) { return resource.has_provider_id(); });
+ if (message.has_resource_providers() &&
+ !offeredResourceProviderResources.empty()) {
+ LOG(INFO)
+ << "Removing offer " << offer->id()
+ << " with resources " << offered << " on agent " << *slave;
rescind = true;
}
@@ -10604,16 +10534,17 @@ void Master::_apply(
// This must have been validated by the caller.
CHECK(!resourceProviderId.isError());
- Option<UUID> resourceVersion = resourceProviderId.isSome()
- ? slave->resourceVersions.get(resourceProviderId.get())
- : slave->resourceVersions.get(None());
+ CHECK(
+ resourceProviderId.isNone() ||
+ slave->resourceProviders.contains(resourceProviderId.get()))
+ << "Resource provider " + stringify(resourceProviderId.get()) +
+ " is unknown";
+
+ CHECK_SOME(slave->resourceVersion);
- CHECK_SOME(resourceVersion)
- << "Resource version of "
- << (resourceProviderId.isSome()
- ? "resource provider " + stringify(resourceProviderId.get())
- : "agent " + stringify(*slave))
- << " is unknown";
+ const UUID resourceVersion = resourceProviderId.isNone()
+ ? slave->resourceVersion.get()
+ : slave->resourceProviders.get(resourceProviderId.get())->resourceVersion;
Operation* operation = new Operation(
protobuf::createOperation(
@@ -10651,7 +10582,7 @@ void Master::_apply(
}
message.mutable_resource_version_uuid()->mutable_uuid()->CopyFrom(
- resourceVersion.get());
+ resourceVersion);
LOG(INFO) << "Sending operation '" << operation->info().id()
<< "' (uuid: " << operation->uuid() << ") "
@@ -11427,7 +11358,7 @@ Slave::Slave(
vector<SlaveInfo::Capability> _capabilites,
const Time& _registeredTime,
vector<Resource> _checkpointedResources,
- const Option<UUID>& resourceVersion,
+ const Option<UUID>& _resourceVersion,
vector<ExecutorInfo> executorInfos,
vector<Task> tasks)
: master(_master),
@@ -11441,6 +11372,7 @@ Slave::Slave(
connected(true),
active(true),
checkpointedResources(std::move(_checkpointedResources)),
+ resourceVersion(_resourceVersion),
observer(nullptr)
{
CHECK(info.has_id());
@@ -11453,10 +11385,6 @@ Slave::Slave(
CHECK_SOME(resources);
totalResources = resources.get();
- if (resourceVersion.isSome()) {
- resourceVersions.put(None(), resourceVersion.get());
- }
-
foreach (ExecutorInfo& executorInfo, executorInfos) {
CHECK(executorInfo.has_framework_id());
addExecutor(executorInfo.framework_id(), std::move(executorInfo));
@@ -11579,7 +11507,21 @@ void Slave::removeTask(Task* task)
void Slave::addOperation(Operation* operation)
{
- operations.put(operation->uuid(), operation);
+ Result<ResourceProviderID> resourceProviderId =
+ getResourceProviderId(operation->info());
+
+ CHECK(!resourceProviderId.isError()) << resourceProviderId.error();
+
+ if (resourceProviderId.isNone()) {
+ operations.put(operation->uuid(), operation);
+ } else {
+ CHECK(resourceProviders.contains(resourceProviderId.get()));
+
+ ResourceProvider& resourceProvider =
+ resourceProviders.at(resourceProviderId.get());
+
+ resourceProvider.operations.put(operation->uuid(), operation);
+ }
if (!protobuf::isSpeculativeOperation(operation->info()) &&
!protobuf::isTerminalState(operation->latest_status().state())) {
@@ -11629,16 +11571,38 @@ void Slave::removeOperation(Operation* operation)
{
const UUID& uuid = operation->uuid();
- CHECK(operations.contains(uuid))
- << "Unknown operation (uuid: " << uuid << ")"
- << " to agent " << *this;
+ Result<ResourceProviderID> resourceProviderId =
+ getResourceProviderId(operation->info());
+
+ CHECK(!resourceProviderId.isError()) << resourceProviderId.error();
+ // Recover the resource used by this operation.
if (!protobuf::isSpeculativeOperation(operation->info()) &&
!protobuf::isTerminalState(operation->latest_status().state())) {
recoverResources(operation);
}
- operations.erase(uuid);
+ // Remove the operation.
+ if (resourceProviderId.isNone()) {
+ CHECK(operations.contains(uuid))
+ << "Unknown operation (uuid: " << uuid << ")"
+ << " to agent " << *this;
+
+ operations.erase(operation->uuid());
+ } else {
+ CHECK(resourceProviders.contains(resourceProviderId.get()))
+ << "resource provider " << resourceProviderId.get() << " is unknown";
+
+ ResourceProvider& resourceProvider =
+ resourceProviders.at(resourceProviderId.get());
+
+ CHECK(resourceProvider.operations.contains(uuid))
+ << "Unknown operation (uuid: " << uuid << ")"
+ << " to resource provider " << resourceProviderId.get()
+ << " on agent " << *this;
+
+ resourceProvider.operations.erase(operation->uuid());
+ }
}
@@ -11647,6 +11611,13 @@ Operation* Slave::getOperation(const UUID& uuid) const
if (operations.contains(uuid)) {
return operations.at(uuid);
}
+
+ foreachvalue (const ResourceProvider& resourceProvider, resourceProviders) {
+ if (resourceProvider.operations.contains(uuid)) {
+ return resourceProvider.operations.at(uuid);
+ }
+ }
+
return nullptr;
}
@@ -11740,6 +11711,24 @@ void Slave::apply(const vector<ResourceConversion>& conversions)
totalResources = resources.get();
checkpointedResources = totalResources.filter(needCheckpointing);
+
+ // Also apply the conversion to the explicitly maintained resource
+ // provider resources.
+ foreach (const ResourceConversion& conversion, conversions) {
+ Result<ResourceProviderID> providerId = getResourceProviderId(conversion);
+
+ if (providerId.isNone()) {
+ continue;
+ }
+
+ CHECK_SOME(providerId);
+ CHECK(resourceProviders.contains(providerId.get()));
+ ResourceProvider& provider = resourceProviders.at(providerId.get());
+
+ CHECK(provider.totalResources.contains(conversion.consumed));
+ provider.totalResources -= conversion.consumed;
+ provider.totalResources += conversion.converted;
+ }
}
@@ -11748,7 +11737,7 @@ Try<Nothing> Slave::update(
const string& _version,
const vector<SlaveInfo::Capability>& _capabilities,
const Resources& _checkpointedResources,
- const Option<UUID>& resourceVersion)
+ const Option<UUID>& _resourceVersion)
{
Try<Resources> resources = applyCheckpointedResources(
_info.resources(),
@@ -11770,9 +11759,7 @@ Try<Nothing> Slave::update(
// reregistering in this case.
totalResources = resources.get();
- if (resourceVersion.isSome()) {
- resourceVersions.put(None(), resourceVersion.get());
- }
+ resourceVersion = _resourceVersion;
return Nothing();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6fe66ce0/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 8bf2c76..896995f 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -124,7 +124,7 @@ Slave(Master* const _master,
std::vector<SlaveInfo::Capability> _capabilites,
const process::Time& _registeredTime,
std::vector<Resource> _checkpointedResources,
- const Option<UUID>& resourceVersion,
+ const Option<UUID>& _resourceVersion,
std::vector<ExecutorInfo> executorInfos = std::vector<ExecutorInfo>(),
std::vector<Task> tasks = std::vector<Task>());
@@ -276,10 +276,44 @@ Slave(Master* const _master,
// providers as well.
Resources totalResources;
+ // Used to establish the relationship between the operation and the
+ // resources that the operation is operating on. Each resource
+ // provider will keep a resource version UUID, and change it when it
+ // believes that the resources from this resource provider are out
+ // of sync from the master's view. The master will keep track of
+ // the last known resource version UUID for each resource provider,
+ // and attach the resource version UUID in each operation it sends
+ // out. The resource provider should reject operations that have a
+ // different resource version UUID than that it maintains, because
+ // this means the operation is operating on resources that might
+ // have already been invalidated.
+ Option<UUID> resourceVersion;
+
SlaveObserver* observer;
- hashmap<Option<ResourceProviderID>, UUID> resourceVersions;
- hashmap<ResourceProviderID, ResourceProviderInfo> resourceProviders;
+ struct ResourceProvider {
+ ResourceProviderInfo info;
+ Resources totalResources;
+
+ // Used to establish the relationship between the operation and the
+ // resources that the operation is operating on. Each resource
+ // provider will keep a resource version UUID, and change it when it
+ // believes that the resources from this resource provider are out
+ // of sync from the master's view. The master will keep track of
+ // the last known resource version UUID for each resource provider,
+ // and attach the resource version UUID in each operation it sends
+ // out. The resource provider should reject operations that have a
+ // different resource version UUID than that it maintains, because
+ // this means the operation is operating on resources that might
+ // have already been invalidated.
+ UUID resourceVersion;
+
+ // Pending operations or terminal operations that have
+ // unacknowledged status updates.
+ hashmap<UUID, Operation*> operations;
+ };
+
+ hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
private:
Slave(const Slave&); // No copying.
[2/3] mesos git commit: Added helper function to determine provider
ID of a conversion.
Posted by bb...@apache.org.
Added helper function to determine provider ID of a conversion.
Review: https://reviews.apache.org/r/65590/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/729cb5b0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/729cb5b0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/729cb5b0
Branch: refs/heads/master
Commit: 729cb5b0b4fc03fc5a818a33a0be32c1e9940577
Parents: 69de373
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Wed Mar 14 13:04:10 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Mar 14 13:04:10 2018 +0100
----------------------------------------------------------------------
src/common/resources_utils.cpp | 28 ++++++++++++++++++++++++++++
src/common/resources_utils.hpp | 8 ++++++++
2 files changed, 36 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/729cb5b0/src/common/resources_utils.cpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp
index 99b16e0..9be01c1 100644
--- a/src/common/resources_utils.cpp
+++ b/src/common/resources_utils.cpp
@@ -285,6 +285,34 @@ Result<ResourceProviderID> getResourceProviderId(
}
+Result<ResourceProviderID> getResourceProviderId(
+ const ResourceConversion& conversion)
+{
+ if (conversion.consumed.empty()) {
+ return Error("Could not determine resource provider");
+ }
+
+ const Resource& consumed = *conversion.consumed.begin();
+
+ const Option<ResourceProviderID> resourceProviderId =
+ consumed.has_provider_id()
+ ? consumed.provider_id()
+ : Option<ResourceProviderID>::none();
+
+
+ foreach (const Resource& resource, conversion.consumed) {
+ const Option<ResourceProviderID> resourceProviderId_ =
+ resource.has_provider_id()
+ ? resource.provider_id()
+ : Option<ResourceProviderID>::none();
+ if (resourceProviderId_ != resourceProviderId) {
+ return Error("Conversion works on multiple resource providers");
+ }
+ }
+
+ return resourceProviderId;
+}
+
void convertResourceFormat(Resource* resource, ResourceFormat format)
{
switch (format) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/729cb5b0/src/common/resources_utils.hpp
----------------------------------------------------------------------
diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp
index 73d070d..74cf747 100644
--- a/src/common/resources_utils.hpp
+++ b/src/common/resources_utils.hpp
@@ -58,6 +58,14 @@ Result<ResourceProviderID> getResourceProviderId(
const Offer::Operation& operation);
+// Returns the ID of the resource provider affected by a resource
+// conversion. Returns None() if the conversion is on agent default
+// resources. We assume a single conversion only being applied on
+// resources from a single resource provider.
+Result<ResourceProviderID> getResourceProviderId(
+ const ResourceConversion& conversion);
+
+
// Returns the resource conversions from the given offer operation.
// This helper assumes that the given operation has already been
// validated.