You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2017/12/07 19:54:55 UTC

[5/6] mesos git commit: Added status update acknowledgement to resource provider manager.

Added status update acknowledgement to resource provider manager.

When the agent receives an offer operation update acknowledgement
from the master, it is forwarded to the relevant resource
provider via the resource provider manager.

Review: https://reviews.apache.org/r/64145/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/761f47fe
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/761f47fe
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/761f47fe

Branch: refs/heads/master
Commit: 761f47fe8e51ca3065a156308220e2a1ab61a628
Parents: cfb634b
Author: Greg Mann <gr...@mesosphere.io>
Authored: Thu Dec 7 11:36:20 2017 -0800
Committer: Greg Mann <gr...@gmail.com>
Committed: Thu Dec 7 11:38:45 2017 -0800

----------------------------------------------------------------------
 src/resource_provider/manager.cpp          | 47 +++++++++++++++++++++++++
 src/resource_provider/manager.hpp          |  5 +++
 src/resource_provider/storage/provider.cpp | 10 ++++++
 3 files changed, 62 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/761f47fe/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 8d8b2f1..75eb6c1 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -158,6 +158,9 @@ public:
 
   void applyOfferOperation(const ApplyOfferOperationMessage& message);
 
+  void acknowledgeOfferOperationUpdate(
+      const OfferOperationUpdateAcknowledgementMessage& message);
+
   Future<Nothing> publish(const Resources& resources);
 
   Queue<ResourceProviderMessage> messages;
@@ -398,6 +401,40 @@ void ResourceProviderManagerProcess::applyOfferOperation(
 }
 
 
+void ResourceProviderManagerProcess::acknowledgeOfferOperationUpdate(
+    const OfferOperationUpdateAcknowledgementMessage& message)
+{
+  CHECK(message.has_resource_provider_id());
+
+  if (!resourceProviders.subscribed.contains(message.resource_provider_id())) {
+    LOG(WARNING) << "Dropping offer operation update acknowledgement with"
+                 << " status_uuid " << message.status_uuid() << " and"
+                 << " operation_uuid " << message.operation_uuid() << " because"
+                 << " resource provider " << message.resource_provider_id()
+                 << " is not subscribed";
+    return;
+  }
+
+  ResourceProvider& resourceProvider =
+    *resourceProviders.subscribed.at(message.resource_provider_id());
+
+  Event event;
+  event.set_type(Event::ACKNOWLEDGE_OFFER_OPERATION);
+  event.mutable_acknowledge_offer_operation()
+    ->set_status_uuid(message.status_uuid());
+  event.mutable_acknowledge_offer_operation()
+    ->set_operation_uuid(message.operation_uuid());
+
+  if (!resourceProvider.http.send(event)) {
+    LOG(WARNING) << "Failed to send offer operation update acknowledgement with"
+                 << " status_uuid " << message.status_uuid() << " and"
+                 << " operation_uuid " << message.operation_uuid() << " to"
+                 << " resource provider " << message.resource_provider_id()
+                 << ": connection closed";
+  }
+}
+
+
 Future<Nothing> ResourceProviderManagerProcess::publish(
     const Resources& resources)
 {
@@ -638,6 +675,16 @@ void ResourceProviderManager::applyOfferOperation(
 }
 
 
+void ResourceProviderManager::acknowledgeOfferOperationUpdate(
+    const OfferOperationUpdateAcknowledgementMessage& message) const
+{
+  return dispatch(
+      process.get(),
+      &ResourceProviderManagerProcess::acknowledgeOfferOperationUpdate,
+      message);
+}
+
+
 Future<Nothing> ResourceProviderManager::publish(const Resources& resources)
 {
   return dispatch(

http://git-wip-us.apache.org/repos/asf/mesos/blob/761f47fe/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index c2aeb15..9f9b201 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -53,6 +53,11 @@ public:
 
   void applyOfferOperation(const ApplyOfferOperationMessage& message) const;
 
+  // Forwards an offer operation status update acknowledgement
+  // to the relevant resource provider.
+  void acknowledgeOfferOperationUpdate(
+      const OfferOperationUpdateAcknowledgementMessage& message) const;
+
   // Ensure that the resources are ready for use.
   process::Future<Nothing> publish(const Resources& resources);
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/761f47fe/src/resource_provider/storage/provider.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/storage/provider.cpp b/src/resource_provider/storage/provider.cpp
index b9eb4f8..e771af6 100644
--- a/src/resource_provider/storage/provider.cpp
+++ b/src/resource_provider/storage/provider.cpp
@@ -326,6 +326,8 @@ private:
   void subscribed(const Event::Subscribed& subscribed);
   void operation(const Event::Operation& operation);
   void publish(const Event::Publish& publish);
+  void acknowledgeOfferOperation(
+      const Event::AcknowledgeOfferOperation& acknowledge);
 
   Future<csi::Client> connect(const string& endpoint);
   Future<csi::Client> getService(const ContainerID& containerId);
@@ -425,6 +427,7 @@ void StorageLocalResourceProviderProcess::received(const Event& event)
     }
     case Event::ACKNOWLEDGE_OFFER_OPERATION: {
       CHECK(event.has_acknowledge_offer_operation());
+      acknowledgeOfferOperation(event.acknowledge_offer_operation());
       break;
     }
     case Event::UNKNOWN: {
@@ -1052,6 +1055,13 @@ void StorageLocalResourceProviderProcess::publish(const Event::Publish& publish)
 }
 
 
+void StorageLocalResourceProviderProcess::acknowledgeOfferOperation(
+    const Event::AcknowledgeOfferOperation& acknowledge)
+{
+  CHECK_EQ(SUBSCRIBED, state);
+}
+
+
 // Returns a future of a CSI client that waits for the endpoint socket
 // to appear if necessary, then connects to the socket and check its
 // supported version.