You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/04/25 15:13:13 UTC

[9/9] mesos git commit: Revert "Delayed construction of the agent's resource provider manager."

Revert "Delayed construction of the agent's resource provider manager."

This reverts commit 0424a6623d08440d8dbe5aff5ec2f18df7b93e24.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1c6a7a3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1c6a7a3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1c6a7a3

Branch: refs/heads/master
Commit: a1c6a7a3c54518f97a34f28b1792885b928b948c
Parents: ed92ee4
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:33 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:33 2018 +0200

----------------------------------------------------------------------
 src/slave/slave.cpp                           | 94 +++++-----------------
 src/slave/slave.hpp                           |  6 +-
 src/tests/resource_provider_manager_tests.cpp |  9 +--
 3 files changed, 22 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d313777..d0ff5f8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,8 +30,6 @@
 #include <utility>
 #include <vector>
 
-#include <glog/logging.h>
-
 #include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/secret_generator.hpp>
@@ -774,20 +772,15 @@ void Slave::initialize()
           logRequest(request);
           return http.executor(request, principal);
         });
-  route(
-      "/api/v1/resource_provider",
-      READWRITE_HTTP_AUTHENTICATION_REALM,
-      Http::RESOURCE_PROVIDER_HELP(),
-      [this](const http::Request& request, const Option<Principal>& principal)
-        -> Future<http::Response> {
-        logRequest(request);
-
-        if (resourceProviderManager.get() == nullptr) {
-          return http::ServiceUnavailable();
-        }
 
-        return resourceProviderManager->api(request, principal);
-      });
+  route("/api/v1/resource_provider",
+        READWRITE_HTTP_AUTHENTICATION_REALM,
+        Http::RESOURCE_PROVIDER_HELP(),
+        [this](const http::Request& request,
+               const Option<Principal>& principal) {
+          logRequest(request);
+          return resourceProviderManager.api(request, principal);
+        });
 
   // TODO(ijimenez): Remove this endpoint at the end of the
   // deprecation cycle on 0.26.
@@ -1509,8 +1502,6 @@ void Slave::registered(
 
       CHECK_SOME(state::checkpoint(path, info));
 
-      initializeResourceProviderManager(flags, info.id());
-
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
@@ -4353,7 +4344,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
   }
 
   if (resourceProviderId.isSome()) {
-    CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
+    resourceProviderManager.applyOperation(message);
     return;
   }
 
@@ -4426,7 +4417,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
   }
 
   if (containsResourceProviderOperations) {
-    CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
+    resourceProviderManager.reconcileOperations(message);
   }
 }
 
@@ -4558,19 +4549,7 @@ void Slave::operationStatusAcknowledgement(
 {
   Operation* operation = getOperation(acknowledgement.operation_uuid());
   if (operation != nullptr) {
-    // If the operation was on resource provider resources forward the
-    // acknowledgement to the resource provider manager as well.
-    Result<ResourceProviderID> resourceProviderId =
-      getResourceProviderId(operation->info());
-
-    CHECK(!resourceProviderId.isError())
-      << "Could not determine resource provider of operation " << operation
-      << ": " << resourceProviderId.error();
-
-    if (resourceProviderId.isSome()) {
-      CHECK_NOTNULL(resourceProviderManager.get())
-        ->acknowledgeOperationStatus(acknowledgement);
-    }
+    resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
 
     CHECK(operation->statuses_size() > 0);
     if (protobuf::isTerminalState(
@@ -7340,8 +7319,10 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
-    if (info.has_id()) {
-      initializeResourceProviderManager(flags, info.id());
+    if (capabilities.resourceProvider) {
+      // Start listening for messages from the resource provider manager.
+      resourceProviderManager.messages().get().onAny(
+          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
     }
 
     // Forward oversubscribed resources.
@@ -7619,7 +7600,7 @@ void Slave::handleResourceProviderMessage(
                << (message.isFailed() ? message.failure() : "future discarded");
 
     // Wait for the next message.
-    CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+    resourceProviderManager.messages().get()
       .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 
     return;
@@ -7878,7 +7859,7 @@ void Slave::handleResourceProviderMessage(
   }
 
   // Wait for the next message.
-  CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+  resourceProviderManager.messages().get()
     .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 }
 
@@ -8133,24 +8114,6 @@ void Slave::apply(Operation* operation)
 Future<Nothing> Slave::publishResources(
     const Option<Resources>& additionalResources)
 {
-  // If the resource provider manager has not been created yet no resource
-  // providers have been added and we do not need to publish anything.
-  if (resourceProviderManager == nullptr) {
-    // We check whether the passed additional resources are compatible
-    // with the expectation that no resource provider resources are in
-    // use, yet. This is not an exhaustive consistency check.
-    if (additionalResources.isSome()) {
-      foreach (const Resource& resource, additionalResources.get()) {
-        CHECK(!resource.has_provider_id())
-          << "Cannot publish resource provider resources "
-          << additionalResources.get()
-          << " until resource providers have subscribed";
-      }
-    }
-
-    return Nothing();
-  }
-
   Resources resources;
 
   // NOTE: For resources providers that serve quantity-based resources
@@ -8171,8 +8134,7 @@ Future<Nothing> Slave::publishResources(
     resources += additionalResources.get();
   }
 
-  return CHECK_NOTNULL(resourceProviderManager.get())
-    ->publishResources(resources);
+  return resourceProviderManager.publishResources(resources);
 }
 
 
@@ -8792,26 +8754,6 @@ double Slave::_resources_revocable_percent(const string& name)
 }
 
 
-void Slave::initializeResourceProviderManager(
-    const Flags& flags,
-    const SlaveID& slaveId)
-{
-  // To simplify reasoning about lifetimes we do not allow
-  // reinitialization of the resource provider manager.
-  if (resourceProviderManager.get() != nullptr) {
-    return;
-  }
-
-  resourceProviderManager.reset(new ResourceProviderManager());
-
-  if (capabilities.resourceProvider) {
-    // Start listening for messages from the resource provider manager.
-    resourceProviderManager->messages().get().onAny(
-        defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
-  }
-}
-
-
 Framework::Framework(
     Slave* _slave,
     const Flags& slaveFlags,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c3866c6..c35996b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,10 +711,6 @@ private:
       const SlaveInfo& previous,
       const SlaveInfo& current) const;
 
-  void initializeResourceProviderManager(
-      const Flags& flags,
-      const SlaveID& slaveId);
-
   protobuf::master::Capabilities requiredMasterCapabilities;
 
   const Flags flags;
@@ -816,7 +812,7 @@ private:
   // (allocated and oversubscribable) resources.
   Option<Resources> oversubscribedResources;
 
-  process::Owned<ResourceProviderManager> resourceProviderManager;
+  ResourceProviderManager resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
 
   // Local resource providers known by the agent.

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0de4e79..c52541b 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,17 +755,14 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Owned<MasterDetector> detector = master.get()->createDetector();
+  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
 
-  // For the agent's resource provider manager to start,
-  // the agent needs to have been assigned an agent ID.
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  Owned<MasterDetector> detector = master.get()->createDetector();
 
   Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
   ASSERT_SOME(agent);
 
-  AWAIT_READY(slaveRegisteredMessage);
+  AWAIT_READY(__recover);
 
   // Wait for recovery to be complete.
   Clock::pause();