You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ji...@apache.org on 2017/11/06 23:06:56 UTC

[06/14] mesos git commit: Added the initial implementation for applying offer operations.

Added the initial implementation for applying offer operations.

The resource provider manager provides an `applyOfferOperation` method
for offer operation affecting resource providers. The resources on
which the operation should be applied contains a resource provider ID.
This will be extracted and an event will be sent to the respective
resource provider.

(This is based on https://reviews.apache.org/r/61810)

Review: https://reviews.apache.org/r/63480


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/97062ac8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/97062ac8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/97062ac8

Branch: refs/heads/master
Commit: 97062ac861b2642d6a882226b767f3ccd1a3c1db
Parents: 9cdac39
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Oct 30 13:57:09 2017 +0100
Committer: Jie Yu <yu...@gmail.com>
Committed: Mon Nov 6 14:37:26 2017 -0800

----------------------------------------------------------------------
 src/messages/messages.proto       |   4 +-
 src/resource_provider/manager.cpp | 128 +++++++++++++++++++++++++++------
 src/resource_provider/manager.hpp |   4 ++
 src/slave/slave.cpp               |  34 ++++++++-
 4 files changed, 148 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/messages/messages.proto
----------------------------------------------------------------------
diff --git a/src/messages/messages.proto b/src/messages/messages.proto
index 2fbca22..1610c2b 100644
--- a/src/messages/messages.proto
+++ b/src/messages/messages.proto
@@ -705,7 +705,9 @@ message OfferOperationStatusUpdate {
 
 
 /**
- * This message is sent from the master to the agent to apply an offer
+ * This message is sent from the master to the resource provider
+ * manager (either on the agent for local resource providers, or on
+ * the master for external resource providers) to apply an offer
  * operation.
  *
  * See resource_provider::Event::OPERATION.

http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 11f8901..a878507 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -37,6 +37,7 @@
 
 #include "common/http.hpp"
 #include "common/recordio.hpp"
+#include "common/resources_utils.hpp"
 
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
@@ -52,7 +53,6 @@ using mesos::resource_provider::Event;
 
 using process::Failure;
 using process::Future;
-using process::Owned;
 using process::Process;
 using process::ProcessBase;
 using process::Queue;
@@ -140,9 +140,9 @@ public:
       const http::Request& request,
       const Option<Principal>& principal);
 
-  Queue<ResourceProviderMessage> messages;
+  void applyOfferOperation(const ApplyOfferOperationMessage& message);
 
-  hashmap<ResourceProviderID, ResourceProvider> resourceProviders;
+  Queue<ResourceProviderMessage> messages;
 
 private:
   void subscribe(
@@ -158,6 +158,11 @@ private:
       const Call::UpdateState& update);
 
   ResourceProviderID newResourceProviderId();
+
+  struct ResourceProviders
+  {
+    hashmap<ResourceProviderID, ResourceProvider> subscribed;
+  } resourceProviders;
 };
 
 
@@ -254,11 +259,12 @@ Future<http::Response> ResourceProviderManagerProcess::api(
     return ok;
   }
 
-  if (!resourceProviders.contains(call.resource_provider_id())) {
-    return BadRequest("Resource provider cannot be found");
+  if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
+    return BadRequest("Resource provider is not subscribed");
   }
 
-  auto resourceProvider = resourceProviders.at(call.resource_provider_id());
+  ResourceProvider& resourceProvider =
+    resourceProviders.subscribed.at(call.resource_provider_id());
 
   // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
   if (!request.headers.contains("Mesos-Stream-Id")) {
@@ -302,6 +308,69 @@ Future<http::Response> ResourceProviderManagerProcess::api(
 }
 
 
+void ResourceProviderManagerProcess::applyOfferOperation(
+    const ApplyOfferOperationMessage& message)
+{
+  const Offer::Operation& operation = message.operation_info();
+  const FrameworkID& frameworkId = message.framework_id();
+
+  Try<UUID> uuid = UUID::fromBytes(message.operation_uuid());
+  if (uuid.isError()) {
+    LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+               << "'" << operation.id() << "' from framework "
+               << frameworkId << ": " << uuid.error();
+    return;
+  }
+
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(operation);
+
+  if (!resourceProviderId.isSome()) {
+    LOG(ERROR) << "Failed to get the resource provider ID of operation "
+               << "'" << operation.id() << "' (uuid: " << uuid->toString()
+               << ") from framework " << frameworkId << ": "
+               << (resourceProviderId.isError() ? resourceProviderId.error()
+                                                : "Not found");
+    return;
+  }
+
+  if (!resourceProviders.subscribed.contains(resourceProviderId.get())) {
+    LOG(WARNING) << "Dropping operation '" << operation.id() << "' (uuid: "
+                 << uuid.get() << ") from framework " << frameworkId
+                 << " because resource provider " << resourceProviderId.get()
+                 << " is not subscribed";
+    return;
+  }
+
+  ResourceProvider& resourceProvider =
+    resourceProviders.subscribed.at(resourceProviderId.get());
+
+  CHECK(message.resource_version_uuid().has_resource_provider_id());
+
+  CHECK_EQ(message.resource_version_uuid().resource_provider_id(),
+           resourceProviderId.get())
+    << "Resource provider ID "
+    << message.resource_version_uuid().resource_provider_id()
+    << " in resource version UUID does not match that in the operation "
+    << resourceProviderId.get();
+
+  Event event;
+  event.set_type(Event::OPERATION);
+  event.mutable_operation()->mutable_framework_id()->CopyFrom(frameworkId);
+  event.mutable_operation()->mutable_info()->CopyFrom(operation);
+  event.mutable_operation()->set_operation_uuid(message.operation_uuid());
+  event.mutable_operation()->set_resource_version_uuid(
+      message.resource_version_uuid().uuid());
+
+  if (!resourceProvider.http.send(event)) {
+    LOG(WARNING) << "Failed to send operation '" << operation.id() << "' "
+                 << "(uuid: " << uuid.get() << ") from framework "
+                 << frameworkId << " to resource provider "
+                 << resourceProviderId.get() << ": connection closed";
+  }
+}
+
+
 void ResourceProviderManagerProcess::subscribe(
     const HttpConnection& http,
     const Call::Subscribe& subscribe)
@@ -309,27 +378,36 @@ void ResourceProviderManagerProcess::subscribe(
   ResourceProviderInfo resourceProviderInfo =
     subscribe.resource_provider_info();
 
-  // TODO(chhsiao): Reject the subscription if it contains an unknown ID
-  // or there is already a subscribed instance with the same ID, and add
-  // tests for re-subscriptions.
+  LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo;
+
   if (!resourceProviderInfo.has_id()) {
+    // The resource provider is subscribing for the first time.
     resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
-  }
 
-  ResourceProvider resourceProvider(resourceProviderInfo, http);
+    ResourceProvider resourceProvider(resourceProviderInfo, http);
 
-  Event event;
-  event.set_type(Event::SUBSCRIBED);
-  event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
-      resourceProvider.info.id());
+    Event event;
+    event.set_type(Event::SUBSCRIBED);
+    event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+        resourceProvider.info.id());
 
-  if (!resourceProvider.http.send(event)) {
-    LOG(WARNING) << "Unable to send event to resource provider "
-                 << stringify(resourceProvider.info.id())
-                 << ": connection closed";
+    if (!resourceProvider.http.send(event)) {
+      LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
+                   << resourceProvider.info.id() << ": connection closed";
+    }
+
+    // TODO(jieyu): Start heartbeat for the resource provider.
+
+    resourceProviders.subscribed.put(
+        resourceProviderInfo.id(),
+        resourceProvider);
+
+    return;
   }
 
-  resourceProviders.put(resourceProviderInfo.id(), std::move(resourceProvider));
+  // TODO(chhsiao): Reject the subscription if it contains an unknown
+  // ID or there is already a subscribed instance with the same ID,
+  // and add tests for re-subscriptions.
 }
 
 
@@ -402,6 +480,16 @@ Future<http::Response> ResourceProviderManager::api(
 }
 
 
+void ResourceProviderManager::applyOfferOperation(
+    const ApplyOfferOperationMessage& message) const
+{
+  return dispatch(
+      process.get(),
+      &ResourceProviderManagerProcess::applyOfferOperation,
+      message);
+}
+
+
 Queue<ResourceProviderMessage> ResourceProviderManager::messages() const
 {
   return process->messages;

http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 3b70e75..e7a9a6c 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -23,6 +23,8 @@
 #include <process/owned.hpp>
 #include <process/queue.hpp>
 
+#include "messages/messages.hpp"
+
 #include "resource_provider/message.hpp"
 
 namespace mesos {
@@ -49,6 +51,8 @@ public:
       const process::http::Request& request,
       const Option<process::http::authentication::Principal>& principal) const;
 
+  void applyOfferOperation(const ApplyOfferOperationMessage& message) const;
+
   // Returns a stream of messages from the resource provider manager.
   process::Queue<ResourceProviderMessage> messages() const;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/97062ac8/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 6cbe209..c108239 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -3626,7 +3626,39 @@ Try<Nothing> Slave::syncCheckpointedResources(
 
 void Slave::applyOfferOperation(const ApplyOfferOperationMessage& message)
 {
-  // TODO(nfnt): Provide implementation here.
+  Try<UUID> uuid = UUID::fromBytes(message.operation_uuid());
+  if (uuid.isError()) {
+    LOG(ERROR) << "Failed to parse offer operation UUID for operation "
+               << "'" << message.operation_info().id() << "' "
+               << "from framework " << message.framework_id()
+               << ": " << uuid.error();
+    return;
+  }
+
+  Result<ResourceProviderID> resourceProviderId =
+    getResourceProviderId(message.operation_info());
+
+  if (resourceProviderId.isError()) {
+    LOG(ERROR) << "Failed to get the resource provider ID of operation "
+               << "'" << message.operation_info().id() << "' "
+               << "(uuid: " << uuid->toString() << ") from framework "
+               << message.framework_id() << ": " << resourceProviderId.error();
+    return;
+  }
+
+  if (resourceProviderId.isSome()) {
+    resourceProviderManager.applyOfferOperation(message);
+    return;
+  }
+
+  // TODO(jieyu): Handle operations for agent default resources. To
+  // support rollback, the agent need to checkpoint the total
+  // resources using the old format (i.e., using `resources.info`).
+  // It's OK that the offer operations are not checkpointed atomically
+  // with the total resources for agent default resources. This is
+  // because the master does not rely on operation feedback to update
+  // the allocation for old operations, and agent default resources
+  // only support old operations.
 }