You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/06 23:06:56 UTC
[06/14] mesos git commit: Added the initial implementation for
applying offer operations.
Added the initial implementation for applying offer operations.
The resource provider manager provides an `applyOfferOperation` method
for offer operation affecting resource providers. The resources on
which the operation should be applied contains a resource provider ID.
This will be extracted and an event will be sent to the respective
resource provider.
(This is based on https://reviews.apache.org/r/61810)
Review: https://reviews.apache.org/r/63480
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97062ac8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97062ac8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97062ac8
Branch: refs/heads/master
Commit: 97062ac861b2642d6a882226b767f3ccd1a3c1db
Parents: 9cdac39
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Oct 30 13:57:09 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800
----------------------------------------------------------------------
src/messages/messages.proto | 4 +-
src/resource_provider/manager.cpp | 128 +++++++++++++++++++++++++++------
src/resource_provider/manager.hpp | 4 ++
src/slave/slave.cpp | 34 ++++++++-
4 files changed, 148 insertions(+), 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 2fbca22..1610c2b 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -705,7 +705,9 @@ message OfferOperationStatusUpdate {
/**
- * This message is sent from the master to the agent to apply an offer
+ * This message is sent from the master to the resource provider
+ * manager (either on the agent for local resource providers, or on
+ * the master for external resource providers) to apply an offer
* operation.
*
* See resource_provider::Event::OPERATION.
http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 11f8901..a878507 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -37,6 +37,7 @@
#include "common/http.hpp"
#include "common/recordio.hpp"
+#include "common/resources_utils.hpp"
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
@@ -52,7 +53,6 @@ using mesos::resource_provider::Event;
using process::Failure;
using process::Future;
-using process::Owned;
using process::Process;
using process::ProcessBase;
using process::Queue;
@@ -140,9 +140,9 @@ public:
const http::Request& request,
const Option<Principal>& principal);
- Queue<ResourceProviderMessage> messages;
+ void applyOfferOperation(const ApplyOfferOperationMessage& message);
- hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
+ Queue<ResourceProviderMessage> messages;
private:
void subscribe(
@@ -158,6 +158,11 @@ private:
const Call::UpdateState& update);
ResourceProviderID newResourceProviderId();
+
+ struct ResourceProviders
+ {
+ hashmap<ResourceProviderID, ResourceProvider> subscribed;
+ } resourceProviders;
};
@@ -254,11 +259,12 @@ Future<http::Response> ResourceProviderManagerProcess::api(
return ok;
}
- if (!resourceProviders.contains(call.resource_provider_id())) {
- return BadRequest("Resource provider cannot be found");
+ if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
+ return BadRequest("Resource provider is not subscribed");
}
- auto resourceProvider = resourceProviders.at(call.resource_provider_id());
+ ResourceProvider& resourceProvider =
+ resourceProviders.subscribed.at(call.resource_provider_id());
// This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
if (!request.headers.contains("Mesos-Stream-Id")) {
@@ -302,6 +308,69 @@ Future<http::Response> ResourceProviderManagerProcess::api(
}
+void ResourceProviderManagerProcess::applyOfferOperation(
+ const ApplyOfferOperationMessage& message)
+{
+ const Offer::Operation& operation = message.operation_info();
+ const FrameworkID& frameworkId = message.framework_id();
+
+ Try<UUID> uuid = UUID::fromBytes(message.operation_uuid());
+ if (uuid.isError()) {
+ LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+ << "'" << operation.id() << "' from framework "
+ << frameworkId << ": " << uuid.error();
+ return;
+ }
+
+ Result<ResourceProviderID> resourceProviderId =
+ getResourceProviderId(operation);
+
+ if (!resourceProviderId.isSome()) {
+ LOG(ERROR) << "Failed to get the resource provider ID of operation "
+ << "'" << operation.id() << "' (uuid: " << uuid->toString()
+ << ") from framework " << frameworkId << ": "
+ << (resourceProviderId.isError() ? resourceProviderId.error()
+ : "Not found");
+ return;
+ }
+
+ if (!resourceProviders.subscribed.contains(resourceProviderId.get())) {
+ LOG(WARNING) << "Dropping operation '" << operation.id() << "' (uuid: "
+ << uuid.get() << ") from framework " << frameworkId
+ << " because resource provider " << resourceProviderId.get()
+ << " is not subscribed";
+ return;
+ }
+
+ ResourceProvider& resourceProvider =
+ resourceProviders.subscribed.at(resourceProviderId.get());
+
+ CHECK(message.resource_version_uuid().has_resource_provider_id());
+
+ CHECK_EQ(message.resource_version_uuid().resource_provider_id(),
+ resourceProviderId.get())
+ << "Resource provider ID "
+ << message.resource_version_uuid().resource_provider_id()
+ << " in resource version UUID does not match that in the operation "
+ << resourceProviderId.get();
+
+ Event event;
+ event.set_type(Event::OPERATION);
+ event.mutable_operation()->mutable_framework_id()->CopyFrom(frameworkId);
+ event.mutable_operation()->mutable_info()->CopyFrom(operation);
+ event.mutable_operation()->set_operation_uuid(message.operation_uuid());
+ event.mutable_operation()->set_resource_version_uuid(
+ message.resource_version_uuid().uuid());
+
+ if (!resourceProvider.http.send(event)) {
+ LOG(WARNING) << "Failed to send operation '" << operation.id() << "' "
+ << "(uuid: " << uuid.get() << ") from framework "
+ << frameworkId << " to resource provider "
+ << resourceProviderId.get() << ": connection closed";
+ }
+}
+
+
void ResourceProviderManagerProcess::subscribe(
const HttpConnection& http,
const Call::Subscribe& subscribe)
@@ -309,27 +378,36 @@ void ResourceProviderManagerProcess::subscribe(
ResourceProviderInfo resourceProviderInfo =
subscribe.resource_provider_info();
- // TODO(chhsiao): Reject the subscription if it contains an unknown ID
- // or there is already a subscribed instance with the same ID, and add
- // tests for re-subscriptions.
+ LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo;
+
if (!resourceProviderInfo.has_id()) {
+ // The resource provider is subscribing for the first time.
resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
- }
- ResourceProvider resourceProvider(resourceProviderInfo, http);
+ ResourceProvider resourceProvider(resourceProviderInfo, http);
- Event event;
- event.set_type(Event::SUBSCRIBED);
- event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
- resourceProvider.info.id());
+ Event event;
+ event.set_type(Event::SUBSCRIBED);
+ event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+ resourceProvider.info.id());
- if (!resourceProvider.http.send(event)) {
- LOG(WARNING) << "Unable to send event to resource provider "
- << stringify(resourceProvider.info.id())
- << ": connection closed";
+ if (!resourceProvider.http.send(event)) {
+ LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
+ << resourceProvider.info.id() << ": connection closed";
+ }
+
+ // TODO(jieyu): Start heartbeat for the resource provider.
+
+ resourceProviders.subscribed.put(
+ resourceProviderInfo.id(),
+ resourceProvider);
+
+ return;
}
- resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
+ // TODO(chhsiao): Reject the subscription if it contains an unknown
+ // ID or there is already a subscribed instance with the same ID,
+ // and add tests for re-subscriptions.
}
@@ -402,6 +480,16 @@ Future<http::Response> ResourceProviderManager::api(
}
+void ResourceProviderManager::applyOfferOperation(
+ const ApplyOfferOperationMessage& message) const
+{
+ return dispatch(
+ process.get(),
+ &ResourceProviderManagerProcess::applyOfferOperation,
+ message);
+}
+
+
Queue<ResourceProviderMessage> ResourceProviderManager::messages() const
{
return process->messages;
http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 3b70e75..e7a9a6c 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -23,6 +23,8 @@
#include <process/owned.hpp>
#include <process/queue.hpp>
+#include "messages/messages.hpp"
+
#include "resource_provider/message.hpp"
namespace mesos {
@@ -49,6 +51,8 @@ public:
const process::http::Request& request,
const Option<process::http::authentication::Principal>& principal) const;
+ void applyOfferOperation(const ApplyOfferOperationMessage& message) const;
+
// Returns a stream of messages from the resource provider manager.
process::Queue<ResourceProviderMessage> messages() const;
http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6cbe209..c108239 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3626,7 +3626,39 @@ Try<Nothing> Slave::syncCheckpointedResources(
void Slave::applyOfferOperation(const ApplyOfferOperationMessage& message)
{
- // TODO(nfnt): Provide implementation here.
+ Try<UUID> uuid = UUID::fromBytes(message.operation_uuid());
+ if (uuid.isError()) {
+ LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+ << "'" << message.operation_info().id() << "' "
+ << "from framework " << message.framework_id()
+ << ": " << uuid.error();
+ return;
+ }
+
+ Result<ResourceProviderID> resourceProviderId =
+ getResourceProviderId(message.operation_info());
+
+ if (resourceProviderId.isError()) {
+ LOG(ERROR) << "Failed to get the resource provider ID of operation "
+ << "'" << message.operation_info().id() << "' "
+ << "(uuid: " << uuid->toString() << ") from framework "
+ << message.framework_id() << ": " << resourceProviderId.error();
+ return;
+ }
+
+ if (resourceProviderId.isSome()) {
+ resourceProviderManager.applyOfferOperation(message);
+ return;
+ }
+
+ // TODO(jieyu): Handle operations for agent default resources. To
+ // support rollback, the agent need to checkpoint the total
+ // resources using the old format (i.e., using `resources.info`).
+ // It's OK that the offer operations are not checkpointed atomically
+ // with the total resources for agent default resources. This is
+ // because the master does not rely on operation feedback to update
+ // the allocation for old operations, and agent default resources
+ // only support old operations.
}