You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/04/25 10:58:19 UTC

[02/10] mesos git commit: Delayed construction of the agent's resource provider manager.

Delayed construction of the agent's resource provider manager.

By delaying the construction of the agent's resource provider manager
we prepare for a following patch introducing a dependency of the
resource provider manager on the agent's ID.

Depending on whether the agent was able to recover an agent ID from
its log or still needs to obtain one in a first registration with the
master, we can only construct the resource provider manager after
different points in the initialization of the agent. To capture the
common code we introduce a helper function performing the necessary
steps.

Review: https://reviews.apache.org/r/66308/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0424a662
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0424a662
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0424a662

Branch: refs/heads/master
Commit: 0424a6623d08440d8dbe5aff5ec2f18df7b93e24
Parents: defbe56
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:07 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:34:09 2018 +0200

----------------------------------------------------------------------
 src/slave/slave.cpp                           | 94 +++++++++++++++++-----
 src/slave/slave.hpp                           |  6 +-
 src/tests/resource_provider_manager_tests.cpp |  9 ++-
 3 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0424a662/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d0ff5f8..d313777 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,6 +30,8 @@
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/secret_generator.hpp>
@@ -772,15 +774,20 @@ void Slave::initialize()
           logRequest(request);
           return http.executor(request, principal);
         });
+  route(
+      "/api/v1/resource_provider",
+      READWRITE_HTTP_AUTHENTICATION_REALM,
+      Http::RESOURCE_PROVIDER_HELP(),
+      [this](const http::Request& request, const Option<Principal>& principal)
+        -> Future<http::Response> {
+        logRequest(request);
+
+        if (resourceProviderManager.get() == nullptr) {
+          return http::ServiceUnavailable();
+        }
 
-  route("/api/v1/resource_provider",
-        READWRITE_HTTP_AUTHENTICATION_REALM,
-        Http::RESOURCE_PROVIDER_HELP(),
-        [this](const http::Request& request,
-               const Option<Principal>& principal) {
-          logRequest(request);
-          return resourceProviderManager.api(request, principal);
-        });
+        return resourceProviderManager->api(request, principal);
+      });
 
   // TODO(ijimenez): Remove this endpoint at the end of the
   // deprecation cycle on 0.26.
@@ -1502,6 +1509,8 @@ void Slave::registered(
 
       CHECK_SOME(state::checkpoint(path, info));
 
+      initializeResourceProviderManager(flags, info.id());
+
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
@@ -4344,7 +4353,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
   }
 
   if (resourceProviderId.isSome()) {
-    resourceProviderManager.applyOperation(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
     return;
   }
 
@@ -4417,7 +4426,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
   }
 
   if (containsResourceProviderOperations) {
-    resourceProviderManager.reconcileOperations(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
   }
 }
 
@@ -4549,7 +4558,19 @@ void Slave::operationStatusAcknowledgement(
 {
   Operation* operation = getOperation(acknowledgement.operation_uuid());
   if (operation != nullptr) {
-    resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
+    // If the operation was on resource provider resources forward the
+    // acknowledgement to the resource provider manager as well.
+    Result<ResourceProviderID> resourceProviderId =
+      getResourceProviderId(operation->info());
+
+    CHECK(!resourceProviderId.isError())
+      << "Could not determine resource provider of operation " << operation
+      << ": " << resourceProviderId.error();
+
+    if (resourceProviderId.isSome()) {
+      CHECK_NOTNULL(resourceProviderManager.get())
+        ->acknowledgeOperationStatus(acknowledgement);
+    }
 
     CHECK(operation->statuses_size() > 0);
     if (protobuf::isTerminalState(
@@ -7319,10 +7340,8 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
-    if (capabilities.resourceProvider) {
-      // Start listening for messages from the resource provider manager.
-      resourceProviderManager.messages().get().onAny(
-          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+    if (info.has_id()) {
+      initializeResourceProviderManager(flags, info.id());
     }
 
     // Forward oversubscribed resources.
@@ -7600,7 +7619,7 @@ void Slave::handleResourceProviderMessage(
                << (message.isFailed() ? message.failure() : "future discarded");
 
     // Wait for the next message.
-    resourceProviderManager.messages().get()
+    CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
       .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 
     return;
@@ -7859,7 +7878,7 @@ void Slave::handleResourceProviderMessage(
   }
 
   // Wait for the next message.
-  resourceProviderManager.messages().get()
+  CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
     .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 }
 
@@ -8114,6 +8133,24 @@ void Slave::apply(Operation* operation)
 Future<Nothing> Slave::publishResources(
     const Option<Resources>& additionalResources)
 {
+  // If the resource provider manager has not been created yet no resource
+  // providers have been added and we do not need to publish anything.
+  if (resourceProviderManager == nullptr) {
+    // We check whether the passed additional resources are compatible
+    // with the expectation that no resource provider resources are in
+    // use, yet. This is not an exhaustive consistency check.
+    if (additionalResources.isSome()) {
+      foreach (const Resource& resource, additionalResources.get()) {
+        CHECK(!resource.has_provider_id())
+          << "Cannot publish resource provider resources "
+          << additionalResources.get()
+          << " until resource providers have subscribed";
+      }
+    }
+
+    return Nothing();
+  }
+
   Resources resources;
 
   // NOTE: For resources providers that serve quantity-based resources
@@ -8134,7 +8171,8 @@ Future<Nothing> Slave::publishResources(
     resources += additionalResources.get();
   }
 
-  return resourceProviderManager.publishResources(resources);
+  return CHECK_NOTNULL(resourceProviderManager.get())
+    ->publishResources(resources);
 }
 
 
@@ -8754,6 +8792,26 @@ double Slave::_resources_revocable_percent(const string& name)
 }
 
 
+void Slave::initializeResourceProviderManager(
+    const Flags& flags,
+    const SlaveID& slaveId)
+{
+  // To simplify reasoning about lifetimes we do not allow
+  // reinitialization of the resource provider manager.
+  if (resourceProviderManager.get() != nullptr) {
+    return;
+  }
+
+  resourceProviderManager.reset(new ResourceProviderManager());
+
+  if (capabilities.resourceProvider) {
+    // Start listening for messages from the resource provider manager.
+    resourceProviderManager->messages().get().onAny(
+        defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+  }
+}
+
+
 Framework::Framework(
     Slave* _slave,
     const Flags& slaveFlags,

http://git-wip-us.apache.org/repos/asf/mesos/blob/0424a662/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c35996b..c3866c6 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,6 +711,10 @@ private:
       const SlaveInfo& previous,
       const SlaveInfo& current) const;
 
+  void initializeResourceProviderManager(
+      const Flags& flags,
+      const SlaveID& slaveId);
+
   protobuf::master::Capabilities requiredMasterCapabilities;
 
   const Flags flags;
@@ -812,7 +816,7 @@ private:
   // (allocated and oversubscribable) resources.
   Option<Resources> oversubscribedResources;
 
-  ResourceProviderManager resourceProviderManager;
+  process::Owned<ResourceProviderManager> resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
 
   // Local resource providers known by the agent.

http://git-wip-us.apache.org/repos/asf/mesos/blob/0424a662/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index c52541b..0de4e79 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,14 +755,17 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
-
   Owned<MasterDetector> detector = master.get()->createDetector();
 
+  // For the agent's resource provider manager to start,
+  // the agent needs to have been assigned an agent ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
   Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
   ASSERT_SOME(agent);
 
-  AWAIT_READY(__recover);
+  AWAIT_READY(slaveRegisteredMessage);
 
   // Wait for recovery to be complete.
   Clock::pause();