You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/04/25 15:13:13 UTC
[9/9] mesos git commit: Revert "Delayed construction of the agent's
resource provider manager."
Revert "Delayed construction of the agent's resource provider manager."
This reverts commit 0424a6623d08440d8dbe5aff5ec2f18df7b93e24.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1c6a7a3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1c6a7a3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1c6a7a3
Branch: refs/heads/master
Commit: a1c6a7a3c54518f97a34f28b1792885b928b948c
Parents: ed92ee4
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:33 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:33 2018 +0200
----------------------------------------------------------------------
src/slave/slave.cpp | 94 +++++-----------------
src/slave/slave.hpp | 6 +-
src/tests/resource_provider_manager_tests.cpp | 9 +--
3 files changed, 22 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d313777..d0ff5f8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,8 +30,6 @@
#include <utility>
#include <vector>
-#include <glog/logging.h>
-
#include <mesos/type_utils.hpp>
#include <mesos/authentication/secret_generator.hpp>
@@ -774,20 +772,15 @@ void Slave::initialize()
logRequest(request);
return http.executor(request, principal);
});
- route(
- "/api/v1/resource_provider",
- READWRITE_HTTP_AUTHENTICATION_REALM,
- Http::RESOURCE_PROVIDER_HELP(),
- [this](const http::Request& request, const Option<Principal>& principal)
- -> Future<http::Response> {
- logRequest(request);
-
- if (resourceProviderManager.get() == nullptr) {
- return http::ServiceUnavailable();
- }
- return resourceProviderManager->api(request, principal);
- });
+ route("/api/v1/resource_provider",
+ READWRITE_HTTP_AUTHENTICATION_REALM,
+ Http::RESOURCE_PROVIDER_HELP(),
+ [this](const http::Request& request,
+ const Option<Principal>& principal) {
+ logRequest(request);
+ return resourceProviderManager.api(request, principal);
+ });
// TODO(ijimenez): Remove this endpoint at the end of the
// deprecation cycle on 0.26.
@@ -1509,8 +1502,6 @@ void Slave::registered(
CHECK_SOME(state::checkpoint(path, info));
- initializeResourceProviderManager(flags, info.id());
-
// We start the local resource providers daemon once the agent is
// running, so the resource providers can use the agent API.
localResourceProviderDaemon->start(info.id());
@@ -4353,7 +4344,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
}
if (resourceProviderId.isSome()) {
- CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
+ resourceProviderManager.applyOperation(message);
return;
}
@@ -4426,7 +4417,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
}
if (containsResourceProviderOperations) {
- CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
+ resourceProviderManager.reconcileOperations(message);
}
}
@@ -4558,19 +4549,7 @@ void Slave::operationStatusAcknowledgement(
{
Operation* operation = getOperation(acknowledgement.operation_uuid());
if (operation != nullptr) {
- // If the operation was on resource provider resources forward the
- // acknowledgement to the resource provider manager as well.
- Result<ResourceProviderID> resourceProviderId =
- getResourceProviderId(operation->info());
-
- CHECK(!resourceProviderId.isError())
- << "Could not determine resource provider of operation " << operation
- << ": " << resourceProviderId.error();
-
- if (resourceProviderId.isSome()) {
- CHECK_NOTNULL(resourceProviderManager.get())
- ->acknowledgeOperationStatus(acknowledgement);
- }
+ resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
CHECK(operation->statuses_size() > 0);
if (protobuf::isTerminalState(
@@ -7340,8 +7319,10 @@ void Slave::__recover(const Future<Nothing>& future)
detection = detector->detect()
.onAny(defer(self(), &Slave::detected, lambda::_1));
- if (info.has_id()) {
- initializeResourceProviderManager(flags, info.id());
+ if (capabilities.resourceProvider) {
+ // Start listening for messages from the resource provider manager.
+ resourceProviderManager.messages().get().onAny(
+ defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
}
// Forward oversubscribed resources.
@@ -7619,7 +7600,7 @@ void Slave::handleResourceProviderMessage(
<< (message.isFailed() ? message.failure() : "future discarded");
// Wait for the next message.
- CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+ resourceProviderManager.messages().get()
.onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
return;
@@ -7878,7 +7859,7 @@ void Slave::handleResourceProviderMessage(
}
// Wait for the next message.
- CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+ resourceProviderManager.messages().get()
.onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
}
@@ -8133,24 +8114,6 @@ void Slave::apply(Operation* operation)
Future<Nothing> Slave::publishResources(
const Option<Resources>& additionalResources)
{
- // If the resource provider manager has not been created yet no resource
- // providers have been added and we do not need to publish anything.
- if (resourceProviderManager == nullptr) {
- // We check whether the passed additional resources are compatible
- // with the expectation that no resource provider resources are in
- // use, yet. This is not an exhaustive consistency check.
- if (additionalResources.isSome()) {
- foreach (const Resource& resource, additionalResources.get()) {
- CHECK(!resource.has_provider_id())
- << "Cannot publish resource provider resources "
- << additionalResources.get()
- << " until resource providers have subscribed";
- }
- }
-
- return Nothing();
- }
-
Resources resources;
// NOTE: For resources providers that serve quantity-based resources
@@ -8171,8 +8134,7 @@ Future<Nothing> Slave::publishResources(
resources += additionalResources.get();
}
- return CHECK_NOTNULL(resourceProviderManager.get())
- ->publishResources(resources);
+ return resourceProviderManager.publishResources(resources);
}
@@ -8792,26 +8754,6 @@ double Slave::_resources_revocable_percent(const string& name)
}
-void Slave::initializeResourceProviderManager(
- const Flags& flags,
- const SlaveID& slaveId)
-{
- // To simplify reasoning about lifetimes we do not allow
- // reinitialization of the resource provider manager.
- if (resourceProviderManager.get() != nullptr) {
- return;
- }
-
- resourceProviderManager.reset(new ResourceProviderManager());
-
- if (capabilities.resourceProvider) {
- // Start listening for messages from the resource provider manager.
- resourceProviderManager->messages().get().onAny(
- defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
- }
-}
-
-
Framework::Framework(
Slave* _slave,
const Flags& slaveFlags,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c3866c6..c35996b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,10 +711,6 @@ private:
const SlaveInfo& previous,
const SlaveInfo& current) const;
- void initializeResourceProviderManager(
- const Flags& flags,
- const SlaveID& slaveId);
-
protobuf::master::Capabilities requiredMasterCapabilities;
const Flags flags;
@@ -816,7 +812,7 @@ private:
// (allocated and oversubscribable) resources.
Option<Resources> oversubscribedResources;
- process::Owned<ResourceProviderManager> resourceProviderManager;
+ ResourceProviderManager resourceProviderManager;
process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
// Local resource providers known by the agent.
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0de4e79..c52541b 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,17 +755,14 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
- Owned<MasterDetector> detector = master.get()->createDetector();
+ Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
- // For the agent's resource provider manager to start,
- // the agent needs to have been assigned an agent ID.
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+ Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
ASSERT_SOME(agent);
- AWAIT_READY(slaveRegisteredMessage);
+ AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();