You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/04/25 15:13:05 UTC
[1/9] mesos git commit: Revert "Removed redundant master flags in
resource provider tests."
Repository: mesos
Updated Branches:
refs/heads/master 1cbda33bb -> a1c6a7a3c
Revert "Removed redundant master flags in resource provider tests."
This reverts commit b1538db6b9d12633195e0d5a0eff48246ea0d4ad.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0f3ec500
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0f3ec500
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0f3ec500
Branch: refs/heads/master
Commit: 0f3ec500d04c860b2334e6bea040f718dfece555
Parents: 1cbda33
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:07:01 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:07:01 2018 +0200
----------------------------------------------------------------------
src/tests/resource_provider_manager_tests.cpp | 12 ++++++++----
1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/0f3ec500/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 889cb32..1ee4dc3 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -914,7 +914,8 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
{
// Start master and agent.
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1049,7 +1050,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
Clock::pause();
// Start master and agent.
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1303,7 +1305,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
Clock::pause();
// Start master and agent.
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1379,7 +1382,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
Clock::pause();
// Start master and agent.
- Try<Owned<cluster::Master>> master = StartMaster();
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
ASSERT_SOME(master);
Owned<MasterDetector> detector = master.get()->createDetector();
[2/9] mesos git commit: Revert "Prevent resubscription of resource
providers with unknown IDs."
Posted by al...@apache.org.
Revert "Prevent resubscription of resource providers with unknown IDs."
This reverts commit 1679d4c85c0bd095e2dafaf7a95adde258db8179.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f13a6e22
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f13a6e22
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f13a6e22
Branch: refs/heads/master
Commit: f13a6e22f7019f9bd5e8941f54fe4616df1972f4
Parents: 0f3ec50
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:07:10 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:07:10 2018 +0200
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 13 +----
src/tests/resource_provider_manager_tests.cpp | 61 ----------------------
2 files changed, 1 insertion(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f13a6e22/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 05911de..1219bb7 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -678,18 +678,7 @@ void ResourceProviderManagerProcess::subscribe(
// restarted or an agent failover. The 'ResourceProviderInfo' might
// have been updated, but its type and name should remain the same.
// We should checkpoint its 'type', 'name' and ID, then check if the
- // resubscription is consistent with the checkpointed record.
-
- const ResourceProviderID& resourceProviderId = resourceProviderInfo.id();
-
- if (!resourceProviders.known.contains(resourceProviderId)) {
- LOG(INFO)
- << "Dropping resubscription attempt of resource provider with ID "
- << resourceProviderId
- << " since it is unknown";
-
- return;
- }
+ // resubscribption is consistent with the checkpointed record.
// If the resource provider is known we do not need to admit it
// again, and the registrar operation implicitly succeeded.
http://git-wip-us.apache.org/repos/asf/mesos/blob/f13a6e22/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1ee4dc3..8c364a1 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -1150,67 +1150,6 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
}
-// Test that when a resource provider attempts to resubscribe with an
-// unknown ID it is not admitted but disconnected.
-TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeUnknownID)
-{
- Clock::pause();
-
- // Start master and agent.
- Try<Owned<cluster::Master>> master = StartMaster();
- ASSERT_SOME(master);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
-
- slave::Flags slaveFlags = CreateSlaveFlags();
-
- // For the agent's resource provider manager to start,
- // the agent needs to have been assigned an agent ID.
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
- Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
- ASSERT_SOME(agent);
-
- Clock::advance(slaveFlags.registration_backoff_factor);
- Clock::settle();
-
- AWAIT_READY(slaveRegisteredMessage);
-
- mesos::v1::ResourceProviderID resourceProviderId;
- resourceProviderId.set_value(id::UUID::random().toString());
-
- mesos::v1::ResourceProviderInfo resourceProviderInfo;
- resourceProviderInfo.mutable_id()->CopyFrom(resourceProviderId);
- resourceProviderInfo.set_type("org.apache.mesos.rp.test");
- resourceProviderInfo.set_name("test");
-
- Owned<v1::MockResourceProvider> resourceProvider(
- new v1::MockResourceProvider(resourceProviderInfo));
-
- // We explicitly reset the resource provider after the expected
- // disconnect to prevent it from resubscribing indefinitely.
- Future<Nothing> disconnected;
- EXPECT_CALL(*resourceProvider, disconnected())
- .WillOnce(DoAll(
- Invoke([&resourceProvider]() { resourceProvider.reset(); }),
- FutureSatisfy(&disconnected)));
-
- // Start and register a resource provider.
- Owned<EndpointDetector> endpointDetector(
- resource_provider::createEndpointDetector(agent.get()->pid));
-
- const ContentType contentType = GetParam();
-
- resourceProvider->start(
- endpointDetector,
- contentType,
- v1::DEFAULT_CREDENTIAL);
-
- AWAIT_READY(disconnected);
-}
-
-
// This test verifies that a disconnected resource provider will
// result in an `UpdateSlaveMessage` to be sent to the master and the
// total resources of the disconnected resource provider will be
[3/9] mesos git commit: Revert "Added admitted resource providers to
the manager's registry."
Posted by al...@apache.org.
Revert "Added admitted resource providers to the manager's registry."
This reverts commit 23ff6622fde41f40ee1d4ee914b5302c7b25de01.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2855a6b6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2855a6b6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2855a6b6
Branch: refs/heads/master
Commit: 2855a6b607510e4114b22bddf43197c72176995e
Parents: f13a6e2
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:07:20 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:07:20 2018 +0200
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 45 ++--------------------
src/resource_provider/registrar.cpp | 10 -----
src/resource_provider/registry.proto | 1 -
src/tests/resource_provider_manager_tests.cpp | 28 ++++----------
4 files changed, 11 insertions(+), 73 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 1219bb7..32f23a7 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,7 +58,6 @@ using std::string;
using mesos::internal::resource_provider::validation::call::validate;
-using mesos::resource_provider::AdmitResourceProvider;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
using mesos::resource_provider::Registrar;
@@ -185,10 +184,6 @@ private:
const HttpConnection& http,
const Call::Subscribe& subscribe);
- void _subscribe(
- const Future<bool>& admitResourceProvider,
- Owned<ResourceProvider> resourceProvider);
-
void updateOperationStatus(
ResourceProvider* resourceProvider,
const Call::UpdateOperationStatus& update);
@@ -662,53 +657,17 @@ void ResourceProviderManagerProcess::subscribe(
Owned<ResourceProvider> resourceProvider(
new ResourceProvider(resourceProviderInfo, http));
- Future<bool> admitResourceProvider;
-
if (!resourceProviderInfo.has_id()) {
// The resource provider is subscribing for the first time.
resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
-
- // If we are handing out a new `ResourceProviderID` persist the ID by
- // triggering a `AdmitResourceProvider` operation on the registrar.
- admitResourceProvider =
- registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>(
- new AdmitResourceProvider(resourceProvider->info.id())));
} else {
// TODO(chhsiao): The resource provider is resubscribing after being
// restarted or an agent failover. The 'ResourceProviderInfo' might
// have been updated, but its type and name should remain the same.
// We should checkpoint its 'type', 'name' and ID, then check if the
// resubscribption is consistent with the checkpointed record.
-
- // If the resource provider is known we do not need to admit it
- // again, and the registrar operation implicitly succeeded.
- admitResourceProvider = true;
}
- admitResourceProvider.onAny(defer(
- self(),
- &ResourceProviderManagerProcess::_subscribe,
- lambda::_1,
- std::move(resourceProvider)));
-}
-
-
-void ResourceProviderManagerProcess::_subscribe(
- const Future<bool>& admitResourceProvider,
- Owned<ResourceProvider> resourceProvider)
-{
- if (!admitResourceProvider.isReady()) {
- LOG(INFO)
- << "Not subscribing resource provider " << resourceProvider->info.id()
- << " as registry update did not succeed: " << admitResourceProvider;
-
- return;
- }
-
- CHECK(admitResourceProvider.get())
- << "Could not admit resource provider " << resourceProvider->info.id()
- << " as registry update was rejected";
-
const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
Event event;
@@ -722,7 +681,7 @@ void ResourceProviderManagerProcess::_subscribe(
return;
}
- resourceProvider->http.closed()
+ http.closed()
.onAny(defer(self(), [=](const Future<Nothing>& future) {
// Iff the remote side closes the HTTP connection, the future will be
// ready. We will remove the resource provider in that case.
@@ -757,6 +716,8 @@ void ResourceProviderManagerProcess::_subscribe(
resourceProviders.known.put(
resourceProviderId,
std::move(resourceProvider_));
+
+ // TODO(bbannier): Persist this information in the registry.
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index c506ec1..d7ec6a6 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -115,15 +115,6 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry)
return Error("Resource provider already admitted");
}
- if (std::find_if(
- registry->removed_resource_providers().begin(),
- registry->removed_resource_providers().end(),
- [this](const ResourceProvider& resourceProvider) {
- return resourceProvider.id() == this->id;
- }) != registry->removed_resource_providers().end()) {
- return Error("Resource provider was removed");
- }
-
ResourceProvider resourceProvider;
resourceProvider.mutable_id()->CopyFrom(id);
@@ -150,7 +141,6 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
return Error("Attempted to remove an unknown resource provider");
}
- registry->add_removed_resource_providers()->CopyFrom(*pos);
registry->mutable_resource_providers()->erase(pos);
return true; // Mutation.
http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
index 491263e..14bd433 100644
--- a/src/resource_provider/registry.proto
+++ b/src/resource_provider/registry.proto
@@ -35,5 +35,4 @@ message ResourceProvider {
// A top level object that is managed by the Registrar and persisted.
message Registry {
repeated ResourceProvider resource_providers = 1;
- repeated ResourceProvider removed_resource_providers = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 8c364a1..eb8e4fc 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -843,24 +843,17 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
AWAIT_READY(registrar.get()->recover());
- Future<bool> admitResourceProvider1 =
+ Future<bool> admitResourceProvider =
registrar.get()->apply(Owned<Registrar::Operation>(
new AdmitResourceProvider(resourceProviderId)));
- AWAIT_READY(admitResourceProvider1);
- EXPECT_TRUE(admitResourceProvider1.get());
+ AWAIT_READY(admitResourceProvider);
+ EXPECT_TRUE(admitResourceProvider.get());
Future<bool> removeResourceProvider =
registrar.get()->apply(Owned<Registrar::Operation>(
new RemoveResourceProvider(resourceProviderId)));
AWAIT_READY(removeResourceProvider);
EXPECT_TRUE(removeResourceProvider.get());
-
- // A removed resource provider cannot be admitted again.
- Future<bool> admitResourceProvider2 =
- registrar.get()->apply(Owned<Registrar::Operation>(
- new AdmitResourceProvider(resourceProviderId)));
- AWAIT_READY(admitResourceProvider2);
- EXPECT_FALSE(admitResourceProvider2.get());
}
@@ -886,24 +879,19 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
ASSERT_SOME(registrar);
ASSERT_NE(nullptr, registrar->get());
- Future<bool> admitResourceProvider1 =
+ AWAIT_READY(masterRegistrar.recover(masterInfo));
+
+ Future<bool> admitResourceProvider =
registrar.get()->apply(Owned<Registrar::Operation>(
new AdmitResourceProvider(resourceProviderId)));
- AWAIT_READY(admitResourceProvider1);
- EXPECT_TRUE(admitResourceProvider1.get());
+ AWAIT_READY(admitResourceProvider);
+ EXPECT_TRUE(admitResourceProvider.get());
Future<bool> removeResourceProvider =
registrar.get()->apply(Owned<Registrar::Operation>(
new RemoveResourceProvider(resourceProviderId)));
AWAIT_READY(removeResourceProvider);
EXPECT_TRUE(removeResourceProvider.get());
-
- // A removed resource provider cannot be admitted again.
- Future<bool> admitResourceProvider2 =
- registrar.get()->apply(Owned<Registrar::Operation>(
- new AdmitResourceProvider(resourceProviderId)));
- AWAIT_READY(admitResourceProvider2);
- EXPECT_FALSE(admitResourceProvider2.get());
}
[6/9] mesos git commit: Revert "Passed on registrar when constructing
resource provider manager."
Posted by al...@apache.org.
Revert "Passed on registrar when constructing resource provider manager."
This reverts commit e1e9a041be55182fc4d683d973f0a2cd25919cf1.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1e49463
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1e49463
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1e49463
Branch: refs/heads/master
Commit: b1e4946372a06ebffe309056a3a64bbda1bf875b
Parents: f670e2b
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:11 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:11 2018 +0200
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 15 ++++--------
src/resource_provider/manager.hpp | 5 +---
src/resource_provider/registrar.cpp | 5 +---
src/slave/slave.cpp | 27 +--------------------
src/tests/resource_provider_manager_tests.cpp | 28 +++++++---------------
5 files changed, 17 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 259a810..68e1866 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,7 +58,6 @@ using mesos::internal::resource_provider::validation::call::validate;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
-using mesos::resource_provider::Registrar;
using process::Failure;
using process::Future;
@@ -158,7 +157,7 @@ class ResourceProviderManagerProcess
: public Process<ResourceProviderManagerProcess>
{
public:
- ResourceProviderManagerProcess(Owned<Registrar> _registrar);
+ ResourceProviderManagerProcess();
Future<http::Response> api(
const http::Request& request,
@@ -213,13 +212,9 @@ private:
};
-ResourceProviderManagerProcess::ResourceProviderManagerProcess(
- Owned<Registrar> _registrar)
+ResourceProviderManagerProcess::ResourceProviderManagerProcess()
: ProcessBase(process::ID::generate("resource-provider-manager")),
- metrics(*this)
-{
- CHECK_NOTNULL(_registrar.get());
-}
+ metrics(*this) {}
Future<http::Response> ResourceProviderManagerProcess::api(
@@ -768,8 +763,8 @@ ResourceProviderManagerProcess::Metrics::~Metrics()
}
-ResourceProviderManager::ResourceProviderManager(Owned<Registrar> registrar)
- : process(new ResourceProviderManagerProcess(std::move(registrar)))
+ResourceProviderManager::ResourceProviderManager()
+ : process(new ResourceProviderManagerProcess())
{
spawn(CHECK_NOTNULL(process.get()));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 6c57956..bc017fa 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -26,7 +26,6 @@
#include "messages/messages.hpp"
#include "resource_provider/message.hpp"
-#include "resource_provider/registrar.hpp"
namespace mesos {
namespace internal {
@@ -38,9 +37,7 @@ class ResourceProviderManagerProcess;
class ResourceProviderManager
{
public:
- ResourceProviderManager(
- process::Owned<resource_provider::Registrar> registrar);
-
+ ResourceProviderManager();
~ResourceProviderManager();
ResourceProviderManager(
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index b151e2b..53b403e 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -185,10 +185,7 @@ private:
GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
: ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
storage(std::move(_storage)),
- state(storage.get())
-{
- CHECK_NOTNULL(storage.get());
-}
+ state(storage.get()) {}
Future<Nothing> GenericRegistrarProcess::recover()
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2632772..d313777 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -38,9 +38,6 @@
#include <mesos/module/authenticatee.hpp>
-#include <mesos/state/leveldb.hpp>
-#include <mesos/state/in_memory.hpp>
-
#include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
#include <process/after.hpp>
@@ -8805,29 +8802,7 @@ void Slave::initializeResourceProviderManager(
return;
}
- // The registrar uses LevelDB as underlying storage. Since LevelDB
- // is currently not supported on Windows (see MESOS-5932), we fall
- // back to in-memory storage there.
- //
- // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
-#ifndef __WINDOWS__
- Owned<mesos::state::Storage> storage(new mesos::state::LevelDBStorage(
- paths::getResourceProviderRegistryPath(flags.work_dir, slaveId)));
-#else
- LOG(WARNING)
- << "Persisting resource provider manager state is not supported on Windows";
- Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
-#endif // __WINDOWS__
-
- Try<Owned<resource_provider::Registrar>> resourceProviderRegistrar =
- resource_provider::Registrar::create(std::move(storage));
-
- CHECK_SOME(resourceProviderRegistrar)
- << "Could not construct resource provider registrar: "
- << resourceProviderRegistrar.error();
-
- resourceProviderManager.reset(
- new ResourceProviderManager(std::move(resourceProviderRegistrar.get())));
+ resourceProviderManager.reset(new ResourceProviderManager());
if (capabilities.resourceProvider) {
// Start listening for messages from the resource provider manager.
http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1664073..ceb7854 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -67,7 +67,6 @@ using mesos::master::detector::MasterDetector;
using mesos::state::InMemoryStorage;
using mesos::state::State;
-using mesos::state::Storage;
using mesos::resource_provider::AdmitResourceProvider;
using mesos::resource_provider::Registrar;
@@ -130,8 +129,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, NoContentType)
request.method = "POST";
request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
@@ -156,8 +154,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, ValidJsonButInvalidProtobuf)
request.headers["Content-Type"] = APPLICATION_JSON;
request.body = stringify(object);
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
@@ -180,8 +177,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, MalformedContent)
request.headers["Content-Type"] = stringify(contentType);
request.body = "MALFORMED_CONTENT";
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
@@ -227,8 +223,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType)
request.headers["Content-Type"] = unknownMediaType;
request.body = serialize(contentType, call);
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Future<http::Response> response = manager.api(request, None());
@@ -240,8 +235,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
{
const ContentType contentType = GetParam();
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Option<id::UUID> streamId;
Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -348,8 +342,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOperationStatus)
{
const ContentType contentType = GetParam();
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Option<id::UUID> streamId;
Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -467,8 +460,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesSuccess)
{
const ContentType contentType = GetParam();
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Option<id::UUID> streamId;
Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -575,8 +567,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesFailure)
{
const ContentType contentType = GetParam();
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Option<id::UUID> streamId;
Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -683,8 +674,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesDisconnected)
{
const ContentType contentType = GetParam();
- ResourceProviderManager manager(
- Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+ ResourceProviderManager manager;
Option<mesos::v1::ResourceProviderID> resourceProviderId;
Option<http::Pipe::Reader> reader;
[8/9] mesos git commit: Revert "Externalized creation of resource
provider manager backing storage."
Posted by al...@apache.org.
Revert "Externalized creation of resource provider manager backing storage."
This reverts commit 6f6413b618b4d7aec7c8f8e6fa9e3542f1af2b9c.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed92ee4e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed92ee4e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed92ee4e
Branch: refs/heads/master
Commit: ed92ee4e61c44c4fe81da277bb68cee56c818fa7
Parents: 6d56382
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:24 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:24 2018 +0200
----------------------------------------------------------------------
src/resource_provider/registrar.cpp | 49 ++++++++++++++++++----
src/resource_provider/registrar.hpp | 15 +++----
src/tests/resource_provider_manager_tests.cpp | 27 +++++++++++-
3 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index dbb55dd..92ef9ae 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -27,6 +27,10 @@
#include <mesos/state/in_memory.hpp>
+#ifndef __WINDOWS__
+#include <mesos/state/leveldb.hpp>
+#endif // __WINDOWS__
+
#include <mesos/state/protobuf.hpp>
#include <process/defer.hpp>
@@ -49,6 +53,12 @@ using std::string;
using mesos::resource_provider::registry::Registry;
using mesos::resource_provider::registry::ResourceProvider;
+using mesos::state::InMemoryStorage;
+
+#ifndef __WINDOWS__
+using mesos::state::LevelDBStorage;
+#endif // __WINDOWS__
+
using mesos::state::Storage;
using mesos::state::protobuf::Variable;
@@ -86,9 +96,11 @@ bool Registrar::Operation::set()
}
-Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
+Try<Owned<Registrar>> Registrar::create(
+ const slave::Flags& slaveFlags,
+ const SlaveID& slaveId)
{
- return new AgentRegistrar(std::move(storage));
+ return new AgentRegistrar(slaveFlags, slaveId);
}
@@ -148,7 +160,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
{
public:
- AgentRegistrarProcess(Owned<Storage> storage);
+ AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
Future<Nothing> recover();
@@ -179,12 +191,33 @@ private:
deque<Owned<Registrar::Operation>> operations;
bool updating = false;
+
+ static Owned<Storage> createStorage(const std::string& path);
};
-AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
+Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
+{
+ // The registrar uses LevelDB as underlying storage. Since LevelDB
+ // is currently not supported on Windows (see MESOS-5932), we fall
+ // back to in-memory storage there.
+ //
+ // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
+#ifndef __WINDOWS__
+ return Owned<Storage>(new LevelDBStorage(path));
+#else
+ LOG(WARNING)
+ << "Persisting resource provider manager state is not supported on Windows";
+ return Owned<Storage>(new InMemoryStorage());
+#endif // __WINDOWS__
+}
+
+
+AgentRegistrarProcess::AgentRegistrarProcess(
+ const slave::Flags& flags, const SlaveID& slaveId)
: ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
- storage(std::move(_storage)),
+ storage(createStorage(slave::paths::getResourceProviderRegistryPath(
+ flags.work_dir, slaveId))),
state(storage.get()) {}
@@ -322,8 +355,10 @@ void AgentRegistrarProcess::_update(
}
-AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
- : process(new AgentRegistrarProcess(std::move(storage)))
+AgentRegistrar::AgentRegistrar(
+ const slave::Flags& slaveFlags,
+ const SlaveID& slaveId)
+ : process(new AgentRegistrarProcess(slaveFlags, slaveId))
{
process::spawn(process.get(), false);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 34cb166..39f45b0 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -19,8 +19,6 @@
#include <memory>
-#include <mesos/state/storage.hpp>
-
#include <process/future.hpp>
#include <process/owned.hpp>
@@ -66,14 +64,15 @@ public:
bool success = false;
};
- // Create a registry on top of generic storage.
- static Try<process::Owned<Registrar>> create(
- process::Owned<state::Storage> storage);
-
// Create a registry on top of a master's persistent state.
static Try<process::Owned<Registrar>> create(
mesos::internal::master::Registrar* registrar);
+ // Create a registry on top of an agent's persistent state.
+ static Try<process::Owned<Registrar>> create(
+ const mesos::internal::slave::Flags& slaveFlags,
+ const SlaveID& slaveId);
+
virtual ~Registrar() = default;
virtual process::Future<Nothing> recover() = 0;
@@ -111,7 +110,9 @@ class AgentRegistrarProcess;
class AgentRegistrar : public Registrar
{
public:
- AgentRegistrar(process::Owned<state::Storage> storage);
+ AgentRegistrar(
+ const mesos::internal::slave::Flags& slaveFlags,
+ const SlaveID& slaveId);
~AgentRegistrar() override;
http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 72e8122..0de4e79 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -825,8 +825,31 @@ TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
ResourceProviderID resourceProviderId;
resourceProviderId.set_value("foo");
- Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
- Try<Owned<Registrar>> registrar = Registrar::create(std::move(storage));
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ const slave::Flags flags = CreateSlaveFlags();
+
+ Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+ FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+ ASSERT_SOME(slave);
+
+ AWAIT_READY(slaveRegisteredMessage);
+
+ // The agent will send `UpdateSlaveMessage` after it has created its
+ // meta directories. Await the message to make sure the agent
+ // registrar can create its store in the meta hierarchy.
+ AWAIT_READY(updateSlaveMessage);
+
+ Try<Owned<Registrar>> registrar =
+ Registrar::create(flags, slaveRegisteredMessage->slave_id());
ASSERT_SOME(registrar);
ASSERT_NE(nullptr, registrar->get());
[7/9] mesos git commit: Revert "Renamed resource provider
`AgentRegistrar` to `GenericRegistrar`."
Posted by al...@apache.org.
Revert "Renamed resource provider `AgentRegistrar` to `GenericRegistrar`."
This reverts commit f6becd6d6f3c8cee779903794ebcfb4f68405358.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d563823
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d563823
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d563823
Branch: refs/heads/master
Commit: 6d56382316930f5075b9ae2ba8ff0c71d845af47
Parents: b1e4946
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:18 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:18 2018 +0200
----------------------------------------------------------------------
src/resource_provider/registrar.cpp | 35 +++++++++++-----------
src/resource_provider/registrar.hpp | 10 +++----
src/tests/resource_provider_manager_tests.cpp | 4 +--
3 files changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/6d563823/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 53b403e..dbb55dd 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -88,7 +88,7 @@ bool Registrar::Operation::set()
Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
{
- return new GenericRegistrar(std::move(storage));
+ return new AgentRegistrar(std::move(storage));
}
@@ -145,10 +145,10 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
}
-class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
+class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
{
public:
- GenericRegistrarProcess(Owned<Storage> storage);
+ AgentRegistrarProcess(Owned<Storage> storage);
Future<Nothing> recover();
@@ -182,13 +182,13 @@ private:
};
-GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
- : ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
+AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
+ : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
storage(std::move(_storage)),
state(storage.get()) {}
-Future<Nothing> GenericRegistrarProcess::recover()
+Future<Nothing> AgentRegistrarProcess::recover()
{
constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
@@ -206,8 +206,7 @@ Future<Nothing> GenericRegistrarProcess::recover()
}
-Future<bool> GenericRegistrarProcess::apply(
- Owned<Registrar::Operation> operation)
+Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
{
if (recovered.isNone()) {
return Failure("Attempted to apply the operation before recovering");
@@ -217,7 +216,7 @@ Future<bool> GenericRegistrarProcess::apply(
}
-Future<bool> GenericRegistrarProcess::_apply(
+Future<bool> AgentRegistrarProcess::_apply(
Owned<Registrar::Operation> operation)
{
if (error.isSome()) {
@@ -235,7 +234,7 @@ Future<bool> GenericRegistrarProcess::_apply(
}
-void GenericRegistrarProcess::update()
+void AgentRegistrarProcess::update()
{
CHECK(!updating);
CHECK_NONE(error);
@@ -276,7 +275,7 @@ void GenericRegistrarProcess::update()
}
-void GenericRegistrarProcess::_update(
+void AgentRegistrarProcess::_update(
const Future<Option<Variable<Registry>>>& store,
const Registry& updatedRegistry,
deque<Owned<Registrar::Operation>> applied)
@@ -323,31 +322,31 @@ void GenericRegistrarProcess::_update(
}
-GenericRegistrar::GenericRegistrar(Owned<Storage> storage)
- : process(new GenericRegistrarProcess(std::move(storage)))
+AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
+ : process(new AgentRegistrarProcess(std::move(storage)))
{
process::spawn(process.get(), false);
}
-GenericRegistrar::~GenericRegistrar()
+AgentRegistrar::~AgentRegistrar()
{
process::terminate(*process);
process::wait(*process);
}
-Future<Nothing> GenericRegistrar::recover()
+Future<Nothing> AgentRegistrar::recover()
{
- return dispatch(process.get(), &GenericRegistrarProcess::recover);
+ return dispatch(process.get(), &AgentRegistrarProcess::recover);
}
-Future<bool> GenericRegistrar::apply(Owned<Operation> operation)
+Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
{
return dispatch(
process.get(),
- &GenericRegistrarProcess::apply,
+ &AgentRegistrarProcess::apply,
std::move(operation));
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/6d563823/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 3c10785..34cb166 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -105,22 +105,22 @@ private:
};
-class GenericRegistrarProcess;
+class AgentRegistrarProcess;
-class GenericRegistrar : public Registrar
+class AgentRegistrar : public Registrar
{
public:
- GenericRegistrar(process::Owned<state::Storage> storage);
+ AgentRegistrar(process::Owned<state::Storage> storage);
- ~GenericRegistrar() override;
+ ~AgentRegistrar() override;
process::Future<Nothing> recover() override;
process::Future<bool> apply(process::Owned<Operation> operation) override;
private:
- std::unique_ptr<GenericRegistrarProcess> process;
+ std::unique_ptr<AgentRegistrarProcess> process;
};
http://git-wip-us.apache.org/repos/asf/mesos/blob/6d563823/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index ceb7854..72e8122 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -819,8 +819,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
class ResourceProviderRegistrarTest : public tests::MesosTest {};
-// Test that the generic resource provider registrar works as expected.
-TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
+// Test that the agent resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
{
ResourceProviderID resourceProviderId;
resourceProviderId.set_value("foo");
[5/9] mesos git commit: Revert "Set up recovery code paths of
resource provider manager."
Posted by al...@apache.org.
Revert "Set up recovery code paths of resource provider manager."
This reverts commit bfcf5571869598a2e6d75550013fdaefa57dd6cb.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f670e2b3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f670e2b3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f670e2b3
Branch: refs/heads/master
Commit: f670e2b329514282515a5df2b52d0ddbcbb92a8b
Parents: 26626f4
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:03 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:03 2018 +0200
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 323 +++++++++------------
src/resource_provider/registrar.cpp | 91 +++---
src/resource_provider/registrar.hpp | 18 +-
src/tests/resource_provider_manager_tests.cpp | 15 +-
4 files changed, 187 insertions(+), 260 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 6393e7a..259a810 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -36,7 +36,6 @@
#include <process/metrics/metrics.hpp>
#include <stout/hashmap.hpp>
-#include <stout/nothing.hpp>
#include <stout/protobuf.hpp>
#include <stout/uuid.hpp>
@@ -48,7 +47,6 @@
#include "internal/devolve.hpp"
#include "internal/evolve.hpp"
-#include "resource_provider/registry.hpp"
#include "resource_provider/validation.hpp"
namespace http = process::http;
@@ -62,8 +60,6 @@ using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
using mesos::resource_provider::Registrar;
-using mesos::resource_provider::registry::Registry;
-
using process::Failure;
using process::Future;
using process::Owned;
@@ -80,10 +76,10 @@ using process::wait;
using process::http::Accepted;
using process::http::BadRequest;
+using process::http::OK;
using process::http::MethodNotAllowed;
using process::http::NotAcceptable;
using process::http::NotImplemented;
-using process::http::OK;
using process::http::Pipe;
using process::http::UnsupportedMediaType;
@@ -196,11 +192,6 @@ private:
ResourceProvider* resourceProvider,
const Call::UpdatePublishResourcesStatus& update);
- Future<Nothing> recover(
- const mesos::resource_provider::registry::Registry& registry);
-
- void initialize() override;
-
ResourceProviderID newResourceProviderId();
double gaugeSubscribed();
@@ -218,9 +209,6 @@ private:
Gauge subscribed;
};
- Owned<Registrar> registrar;
- Promise<Nothing> recovered;
-
Metrics metrics;
};
@@ -228,191 +216,152 @@ private:
ResourceProviderManagerProcess::ResourceProviderManagerProcess(
Owned<Registrar> _registrar)
: ProcessBase(process::ID::generate("resource-provider-manager")),
- registrar(std::move(_registrar)),
metrics(*this)
{
- CHECK_NOTNULL(registrar.get());
+ CHECK_NOTNULL(_registrar.get());
}
-void ResourceProviderManagerProcess::initialize()
+Future<http::Response> ResourceProviderManagerProcess::api(
+ const http::Request& request,
+ const Option<Principal>& principal)
{
- // Recover the registrar.
- registrar->recover()
- .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1))
- .onAny([](const Future<Nothing>& recovered) {
- if (!recovered.isReady()) {
- LOG(FATAL)
- << "Failed to recover resource provider manager registry: "
- << recovered;
- }
- });
-}
+ if (request.method != "POST") {
+ return MethodNotAllowed({"POST"}, request.method);
+ }
+ v1::resource_provider::Call v1Call;
-Future<Nothing> ResourceProviderManagerProcess::recover(
- const mesos::resource_provider::registry::Registry& registry)
-{
- recovered.set(Nothing());
+ // TODO(anand): Content type values are case-insensitive.
+ Option<string> contentType = request.headers.get("Content-Type");
- return Nothing();
-}
+ if (contentType.isNone()) {
+ return BadRequest("Expecting 'Content-Type' to be present");
+ }
+ if (contentType.get() == APPLICATION_PROTOBUF) {
+ if (!v1Call.ParseFromString(request.body)) {
+ return BadRequest("Failed to parse body into Call protobuf");
+ }
+ } else if (contentType.get() == APPLICATION_JSON) {
+ Try<JSON::Value> value = JSON::parse(request.body);
+ if (value.isError()) {
+ return BadRequest("Failed to parse body into JSON: " + value.error());
+ }
-Future<http::Response> ResourceProviderManagerProcess::api(
- const http::Request& request,
- const Option<Principal>& principal)
-{
- // TODO(bbannier): This implementation does not limit the number of messages
- // in the actor's inbox which could become large should a big number of
- // resource providers attempt to subscribe before recovery completed. Consider
- // rejecting requests until the resource provider manager has recovered. This
- // would likely require implementing retry logic in resource providers.
- return recovered.future().then(defer(
- self(), [this, request, principal](const Nothing&) -> http::Response {
- if (request.method != "POST") {
- return MethodNotAllowed({"POST"}, request.method);
- }
-
- v1::resource_provider::Call v1Call;
-
- // TODO(anand): Content type values are case-insensitive.
- Option<string> contentType = request.headers.get("Content-Type");
-
- if (contentType.isNone()) {
- return BadRequest("Expecting 'Content-Type' to be present");
- }
-
- if (contentType.get() == APPLICATION_PROTOBUF) {
- if (!v1Call.ParseFromString(request.body)) {
- return BadRequest("Failed to parse body into Call protobuf");
- }
- } else if (contentType.get() == APPLICATION_JSON) {
- Try<JSON::Value> value = JSON::parse(request.body);
- if (value.isError()) {
- return BadRequest(
- "Failed to parse body into JSON: " + value.error());
- }
-
- Try<v1::resource_provider::Call> parse =
- ::protobuf::parse<v1::resource_provider::Call>(value.get());
-
- if (parse.isError()) {
- return BadRequest(
- "Failed to convert JSON into Call protobuf: " + parse.error());
- }
-
- v1Call = parse.get();
- } else {
- return UnsupportedMediaType(
- string("Expecting 'Content-Type' of ") + APPLICATION_JSON +
- " or " + APPLICATION_PROTOBUF);
- }
-
- Call call = devolve(v1Call);
-
- Option<Error> error = validate(call);
- if (error.isSome()) {
- return BadRequest(
- "Failed to validate resource_provider::Call: " + error->message);
- }
-
- if (call.type() == Call::SUBSCRIBE) {
- // We default to JSON 'Content-Type' in the response since an empty
- // 'Accept' header results in all media types considered acceptable.
- ContentType acceptType = ContentType::JSON;
-
- if (request.acceptsMediaType(APPLICATION_JSON)) {
- acceptType = ContentType::JSON;
- } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
- acceptType = ContentType::PROTOBUF;
- } else {
- return NotAcceptable(
- string("Expecting 'Accept' to allow ") + "'" +
- APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
- }
-
- if (request.headers.contains("Mesos-Stream-Id")) {
- return BadRequest(
- "Subscribe calls should not include the 'Mesos-Stream-Id' "
- "header");
- }
-
- Pipe pipe;
- OK ok;
-
- ok.headers["Content-Type"] = stringify(acceptType);
- ok.type = http::Response::PIPE;
- ok.reader = pipe.reader();
-
- // Generate a stream ID and return it in the response.
- id::UUID streamId = id::UUID::random();
- ok.headers["Mesos-Stream-Id"] = streamId.toString();
-
- HttpConnection http(pipe.writer(), acceptType, streamId);
- this->subscribe(http, call.subscribe());
-
- return std::move(ok);
- }
-
- if (!this->resourceProviders.subscribed.contains(
- call.resource_provider_id())) {
- return BadRequest("Resource provider is not subscribed");
- }
-
- ResourceProvider* resourceProvider =
- this->resourceProviders.subscribed.at(call.resource_provider_id())
- .get();
-
- // This isn't a `SUBSCRIBE` call, so the request should include a stream
- // ID.
- if (!request.headers.contains("Mesos-Stream-Id")) {
- return BadRequest(
- "All non-subscribe calls should include to 'Mesos-Stream-Id' "
- "header");
- }
-
- const string& streamId = request.headers.at("Mesos-Stream-Id");
- if (streamId != resourceProvider->http.streamId.toString()) {
- return BadRequest(
- "The stream ID '" + streamId +
- "' included in this request "
- "didn't match the stream ID currently associated with "
- " resource provider ID " +
- resourceProvider->info.id().value());
- }
-
- switch (call.type()) {
- case Call::UNKNOWN: {
- return NotImplemented();
- }
-
- case Call::SUBSCRIBE: {
- // `SUBSCRIBE` call should have been handled above.
- LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
- }
-
- case Call::UPDATE_OPERATION_STATUS: {
- this->updateOperationStatus(
- resourceProvider, call.update_operation_status());
-
- return Accepted();
- }
-
- case Call::UPDATE_STATE: {
- this->updateState(resourceProvider, call.update_state());
- return Accepted();
- }
-
- case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
- this->updatePublishResourcesStatus(
- resourceProvider, call.update_publish_resources_status());
- return Accepted();
- }
- }
-
- UNREACHABLE();
- }));
+ Try<v1::resource_provider::Call> parse =
+ ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+ if (parse.isError()) {
+ return BadRequest("Failed to convert JSON into Call protobuf: " +
+ parse.error());
+ }
+
+ v1Call = parse.get();
+ } else {
+ return UnsupportedMediaType(
+ string("Expecting 'Content-Type' of ") +
+ APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
+ }
+
+ Call call = devolve(v1Call);
+
+ Option<Error> error = validate(call);
+ if (error.isSome()) {
+ return BadRequest(
+ "Failed to validate resource_provider::Call: " + error->message);
+ }
+
+ if (call.type() == Call::SUBSCRIBE) {
+ // We default to JSON 'Content-Type' in the response since an empty
+ // 'Accept' header results in all media types considered acceptable.
+ ContentType acceptType = ContentType::JSON;
+
+ if (request.acceptsMediaType(APPLICATION_JSON)) {
+ acceptType = ContentType::JSON;
+ } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+ acceptType = ContentType::PROTOBUF;
+ } else {
+ return NotAcceptable(
+ string("Expecting 'Accept' to allow ") +
+ "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+ }
+
+ if (request.headers.contains("Mesos-Stream-Id")) {
+ return BadRequest(
+ "Subscribe calls should not include the 'Mesos-Stream-Id' header");
+ }
+
+ Pipe pipe;
+ OK ok;
+
+ ok.headers["Content-Type"] = stringify(acceptType);
+ ok.type = http::Response::PIPE;
+ ok.reader = pipe.reader();
+
+ // Generate a stream ID and return it in the response.
+ id::UUID streamId = id::UUID::random();
+ ok.headers["Mesos-Stream-Id"] = streamId.toString();
+
+ HttpConnection http(pipe.writer(), acceptType, streamId);
+ subscribe(http, call.subscribe());
+
+ return ok;
+ }
+
+ if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
+ return BadRequest("Resource provider is not subscribed");
+ }
+
+ ResourceProvider* resourceProvider =
+ resourceProviders.subscribed.at(call.resource_provider_id()).get();
+
+ // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
+ if (!request.headers.contains("Mesos-Stream-Id")) {
+ return BadRequest(
+ "All non-subscribe calls should include to 'Mesos-Stream-Id' header");
+ }
+
+ const string& streamId = request.headers.at("Mesos-Stream-Id");
+ if (streamId != resourceProvider->http.streamId.toString()) {
+ return BadRequest(
+ "The stream ID '" + streamId + "' included in this request "
+ "didn't match the stream ID currently associated with "
+ " resource provider ID " + resourceProvider->info.id().value());
+ }
+
+ switch(call.type()) {
+ case Call::UNKNOWN: {
+ return NotImplemented();
+ }
+
+ case Call::SUBSCRIBE: {
+ // `SUBSCRIBE` call should have been handled above.
+ LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
+ }
+
+ case Call::UPDATE_OPERATION_STATUS: {
+ updateOperationStatus(
+ resourceProvider,
+ call.update_operation_status());
+
+ return Accepted();
+ }
+
+ case Call::UPDATE_STATE: {
+ updateState(resourceProvider, call.update_state());
+ return Accepted();
+ }
+
+ case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
+ updatePublishResourcesStatus(
+ resourceProvider,
+ call.update_publish_resources_status());
+ return Accepted();
+ }
+ }
+
+ UNREACHABLE();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index d7ec6a6..b151e2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -92,11 +92,9 @@ Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
}
-Try<Owned<Registrar>> Registrar::create(
- master::Registrar* registrar,
- Registry registry)
+Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
{
- return new MasterRegistrar(registrar, std::move(registry));
+ return new MasterRegistrar(registrar);
}
@@ -152,29 +150,28 @@ class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
public:
GenericRegistrarProcess(Owned<Storage> storage);
- Future<Registry> recover();
+ Future<Nothing> recover();
Future<bool> apply(Owned<Registrar::Operation> operation);
- void update();
-
- void initialize() override;
-
-private:
Future<bool> _apply(Owned<Registrar::Operation> operation);
+ void update();
+
void _update(
const Future<Option<Variable<Registry>>>& store,
+ const Registry& updatedRegistry,
deque<Owned<Registrar::Operation>> applied);
-
+private:
Owned<Storage> storage;
// Use fully qualified type for `State` to disambiguate with `State`
// enumeration in `ProcessBase`.
mesos::state::protobuf::State state;
- Promise<Nothing> recovered;
+ Option<Future<Nothing>> recovered;
+ Option<Registry> registry;
Option<Variable<Registry>> variable;
Option<Error> error;
@@ -194,36 +191,32 @@ GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
}
-void GenericRegistrarProcess::initialize()
+Future<Nothing> GenericRegistrarProcess::recover()
{
constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
- CHECK_NONE(variable);
-
- recovered.associate(state.fetch<Registry>(NAME).then(
- defer(self(), [this](const Variable<Registry>& recovery) {
- variable = recovery;
- return Nothing();
- })));
-}
+ if (recovered.isNone()) {
+ recovered = state.fetch<Registry>(NAME).then(
+ defer(self(), [this](const Variable<Registry>& recovery) {
+ registry = recovery.get();
+ variable = recovery;
+ return Nothing();
+ }));
+ }
-Future<Registry> GenericRegistrarProcess::recover()
-{
- // Prevent discards on the returned `Future` by marking the result as
- // `undiscardable` so that we control the lifetime of the recovering registry.
- return undiscardable(recovered.future()).then([this](const Nothing&) {
- CHECK_SOME(this->variable);
- return this->variable->get();
- });
+ return recovered.get();
}
Future<bool> GenericRegistrarProcess::apply(
Owned<Registrar::Operation> operation)
{
- return undiscardable(recovered.future()).then(
- defer(self(), &Self::_apply, std::move(operation)));
+ if (recovered.isNone()) {
+ return Failure("Attempted to apply the operation before recovering");
+ }
+
+ return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
}
@@ -256,9 +249,8 @@ void GenericRegistrarProcess::update()
updating = true;
- CHECK_SOME(variable);
-
- Registry updatedRegistry = variable->get();
+ CHECK_SOME(registry);
+ Registry updatedRegistry = registry.get();
foreach (Owned<Registrar::Operation>& operation, operations) {
Try<bool> operationResult = (*operation)(&updatedRegistry);
@@ -280,6 +272,7 @@ void GenericRegistrarProcess::update()
self(),
&Self::_update,
lambda::_1,
+ updatedRegistry,
std::move(operations)));
operations.clear();
@@ -288,6 +281,7 @@ void GenericRegistrarProcess::update()
void GenericRegistrarProcess::_update(
const Future<Option<Variable<Registry>>>& store,
+ const Registry& updatedRegistry,
deque<Owned<Registrar::Operation>> applied)
{
updating = false;
@@ -316,6 +310,7 @@ void GenericRegistrarProcess::_update(
}
variable = store->get();
+ registry = updatedRegistry;
// Remove the operations.
while (!applied.empty()) {
@@ -345,7 +340,7 @@ GenericRegistrar::~GenericRegistrar()
}
-Future<Registry> GenericRegistrar::recover()
+Future<Nothing> GenericRegistrar::recover()
{
return dispatch(process.get(), &GenericRegistrarProcess::recover);
}
@@ -369,8 +364,6 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
public:
AdaptedOperation(Owned<Registrar::Operation> operation);
- Future<registry::Registry> recover();
-
private:
Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override;
@@ -383,17 +376,12 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
};
public:
- explicit MasterRegistrarProcess(
- master::Registrar* registrar,
- Registry registry);
+ explicit MasterRegistrarProcess(master::Registrar* registrar);
Future<bool> apply(Owned<Registrar::Operation> operation);
- Future<registry::Registry> recover() { return registry; }
-
private:
master::Registrar* registrar = nullptr;
- Registry registry;
};
@@ -410,12 +398,9 @@ Try<bool> MasterRegistrarProcess::AdaptedOperation::perform(
}
-MasterRegistrarProcess::MasterRegistrarProcess(
- master::Registrar* _registrar,
- registry::Registry _registry)
+MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
: ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
- registrar(_registrar),
- registry(std::move(_registry)) {}
+ registrar(_registrar) {}
Future<bool> MasterRegistrarProcess::apply(
@@ -428,10 +413,8 @@ Future<bool> MasterRegistrarProcess::apply(
}
-MasterRegistrar::MasterRegistrar(
- master::Registrar* registrar,
- registry::Registry registry)
- : process(new MasterRegistrarProcess(registrar, std::move(registry)))
+MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
+ : process(new MasterRegistrarProcess(registrar))
{
spawn(process.get(), false);
}
@@ -444,9 +427,9 @@ MasterRegistrar::~MasterRegistrar()
}
-Future<Registry> MasterRegistrar::recover()
+Future<Nothing> MasterRegistrar::recover()
{
- return dispatch(process.get(), &MasterRegistrarProcess::recover);
+ return Nothing();
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index ded56e1..3c10785 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -71,16 +71,12 @@ public:
process::Owned<state::Storage> storage);
// Create a registry on top of a master's persistent state.
- //
- // The created registrar does not take ownership of the passed registrar
- // which needs to be valid as long as the created registrar is alive.
static Try<process::Owned<Registrar>> create(
- mesos::internal::master::Registrar* registrar,
- registry::Registry registry);
+ mesos::internal::master::Registrar* registrar);
virtual ~Registrar() = default;
- virtual process::Future<registry::Registry> recover() = 0;
+ virtual process::Future<Nothing> recover() = 0;
virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
};
@@ -119,7 +115,7 @@ public:
~GenericRegistrar() override;
- process::Future<registry::Registry> recover() override;
+ process::Future<Nothing> recover() override;
process::Future<bool> apply(process::Owned<Operation> operation) override;
@@ -134,17 +130,13 @@ class MasterRegistrarProcess;
class MasterRegistrar : public Registrar
{
public:
- // The created registrar does not take ownership of the passed registrar
- // which needs to be valid as long as the created registrar is alive.
- explicit MasterRegistrar(
- mesos::internal::master::Registrar* registrar,
- registry::Registry registry);
+ explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar);
~MasterRegistrar() override;
// This registrar performs no recovery; instead to recover
// the underlying master registrar needs to be recovered.
- process::Future<registry::Registry> recover() override;
+ process::Future<Nothing> recover() override;
process::Future<bool> apply(process::Owned<Operation> operation) override;
http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index eb8e4fc..1664073 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -841,6 +841,10 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
ASSERT_SOME(registrar);
ASSERT_NE(nullptr, registrar->get());
+ // Applying operations on a not yet recovered registrar fails.
+ AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+ new AdmitResourceProvider(resourceProviderId))));
+
AWAIT_READY(registrar.get()->recover());
Future<bool> admitResourceProvider =
@@ -869,16 +873,15 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
const MasterInfo masterInfo = protobuf::createMasterInfo({});
- Future<Registry> registry = masterRegistrar.recover(masterInfo);
- AWAIT_READY(registry);
-
- Try<Owned<Registrar>> registrar = Registrar::create(
- &masterRegistrar,
- registry->resource_provider_registry());
+ Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar);
ASSERT_SOME(registrar);
ASSERT_NE(nullptr, registrar->get());
+ // Applying operations on a not yet recovered registrar fails.
+ AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+ new AdmitResourceProvider(resourceProviderId))));
+
AWAIT_READY(masterRegistrar.recover(masterInfo));
Future<bool> admitResourceProvider =
[4/9] mesos git commit: Revert "Remembered recovered and subscribed
providers in ephemeral state."
Posted by al...@apache.org.
Revert "Remembered recovered and subscribed providers in ephemeral state."
This reverts commit ac870c00ab11ccc14b9c872c78130ef4d568c0ee.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26626f42
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26626f42
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26626f42
Branch: refs/heads/master
Commit: 26626f42e974ec012a276f08006231723f72e51b
Parents: 2855a6b
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:08:57 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:08:57 2018 +0200
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 22 ----------------------
1 file changed, 22 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/26626f42/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 32f23a7..6393e7a 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -208,10 +208,6 @@ private:
struct ResourceProviders
{
hashmap<ResourceProviderID, Owned<ResourceProvider>> subscribed;
- hashmap<
- ResourceProviderID,
- mesos::resource_provider::registry::ResourceProvider>
- known;
} resourceProviders;
struct Metrics
@@ -257,13 +253,6 @@ void ResourceProviderManagerProcess::initialize()
Future<Nothing> ResourceProviderManagerProcess::recover(
const mesos::resource_provider::registry::Registry& registry)
{
- foreach (
- const mesos::resource_provider::registry::ResourceProvider&
- resourceProvider,
- registry.resource_providers()) {
- resourceProviders.known.put(resourceProvider.id(), resourceProvider);
- }
-
recovered.set(Nothing());
return Nothing();
@@ -708,17 +697,6 @@ void ResourceProviderManagerProcess::subscribe(
resourceProviders.subscribed.put(
resourceProviderId,
std::move(resourceProvider));
-
- if (!resourceProviders.known.contains(resourceProviderId)) {
- mesos::resource_provider::registry::ResourceProvider resourceProvider_;
- resourceProvider_.mutable_id()->CopyFrom(resourceProviderId);
-
- resourceProviders.known.put(
- resourceProviderId,
- std::move(resourceProvider_));
-
- // TODO(bbannier): Persist this information in the registry.
- }
}
[9/9] mesos git commit: Revert "Delayed construction of the agent's
resource provider manager."
Posted by al...@apache.org.
Revert "Delayed construction of the agent's resource provider manager."
This reverts commit 0424a6623d08440d8dbe5aff5ec2f18df7b93e24.
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1c6a7a3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1c6a7a3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1c6a7a3
Branch: refs/heads/master
Commit: a1c6a7a3c54518f97a34f28b1792885b928b948c
Parents: ed92ee4
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:33 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:33 2018 +0200
----------------------------------------------------------------------
src/slave/slave.cpp | 94 +++++-----------------
src/slave/slave.hpp | 6 +-
src/tests/resource_provider_manager_tests.cpp | 9 +--
3 files changed, 22 insertions(+), 87 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d313777..d0ff5f8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,8 +30,6 @@
#include <utility>
#include <vector>
-#include <glog/logging.h>
-
#include <mesos/type_utils.hpp>
#include <mesos/authentication/secret_generator.hpp>
@@ -774,20 +772,15 @@ void Slave::initialize()
logRequest(request);
return http.executor(request, principal);
});
- route(
- "/api/v1/resource_provider",
- READWRITE_HTTP_AUTHENTICATION_REALM,
- Http::RESOURCE_PROVIDER_HELP(),
- [this](const http::Request& request, const Option<Principal>& principal)
- -> Future<http::Response> {
- logRequest(request);
-
- if (resourceProviderManager.get() == nullptr) {
- return http::ServiceUnavailable();
- }
- return resourceProviderManager->api(request, principal);
- });
+ route("/api/v1/resource_provider",
+ READWRITE_HTTP_AUTHENTICATION_REALM,
+ Http::RESOURCE_PROVIDER_HELP(),
+ [this](const http::Request& request,
+ const Option<Principal>& principal) {
+ logRequest(request);
+ return resourceProviderManager.api(request, principal);
+ });
// TODO(ijimenez): Remove this endpoint at the end of the
// deprecation cycle on 0.26.
@@ -1509,8 +1502,6 @@ void Slave::registered(
CHECK_SOME(state::checkpoint(path, info));
- initializeResourceProviderManager(flags, info.id());
-
// We start the local resource providers daemon once the agent is
// running, so the resource providers can use the agent API.
localResourceProviderDaemon->start(info.id());
@@ -4353,7 +4344,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
}
if (resourceProviderId.isSome()) {
- CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
+ resourceProviderManager.applyOperation(message);
return;
}
@@ -4426,7 +4417,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
}
if (containsResourceProviderOperations) {
- CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
+ resourceProviderManager.reconcileOperations(message);
}
}
@@ -4558,19 +4549,7 @@ void Slave::operationStatusAcknowledgement(
{
Operation* operation = getOperation(acknowledgement.operation_uuid());
if (operation != nullptr) {
- // If the operation was on resource provider resources forward the
- // acknowledgement to the resource provider manager as well.
- Result<ResourceProviderID> resourceProviderId =
- getResourceProviderId(operation->info());
-
- CHECK(!resourceProviderId.isError())
- << "Could not determine resource provider of operation " << operation
- << ": " << resourceProviderId.error();
-
- if (resourceProviderId.isSome()) {
- CHECK_NOTNULL(resourceProviderManager.get())
- ->acknowledgeOperationStatus(acknowledgement);
- }
+ resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
CHECK(operation->statuses_size() > 0);
if (protobuf::isTerminalState(
@@ -7340,8 +7319,10 @@ void Slave::__recover(const Future<Nothing>& future)
detection = detector->detect()
.onAny(defer(self(), &Slave::detected, lambda::_1));
- if (info.has_id()) {
- initializeResourceProviderManager(flags, info.id());
+ if (capabilities.resourceProvider) {
+ // Start listening for messages from the resource provider manager.
+ resourceProviderManager.messages().get().onAny(
+ defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
}
// Forward oversubscribed resources.
@@ -7619,7 +7600,7 @@ void Slave::handleResourceProviderMessage(
<< (message.isFailed() ? message.failure() : "future discarded");
// Wait for the next message.
- CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+ resourceProviderManager.messages().get()
.onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
return;
@@ -7878,7 +7859,7 @@ void Slave::handleResourceProviderMessage(
}
// Wait for the next message.
- CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+ resourceProviderManager.messages().get()
.onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
}
@@ -8133,24 +8114,6 @@ void Slave::apply(Operation* operation)
Future<Nothing> Slave::publishResources(
const Option<Resources>& additionalResources)
{
- // If the resource provider manager has not been created yet no resource
- // providers have been added and we do not need to publish anything.
- if (resourceProviderManager == nullptr) {
- // We check whether the passed additional resources are compatible
- // with the expectation that no resource provider resources are in
- // use, yet. This is not an exhaustive consistency check.
- if (additionalResources.isSome()) {
- foreach (const Resource& resource, additionalResources.get()) {
- CHECK(!resource.has_provider_id())
- << "Cannot publish resource provider resources "
- << additionalResources.get()
- << " until resource providers have subscribed";
- }
- }
-
- return Nothing();
- }
-
Resources resources;
// NOTE: For resources providers that serve quantity-based resources
@@ -8171,8 +8134,7 @@ Future<Nothing> Slave::publishResources(
resources += additionalResources.get();
}
- return CHECK_NOTNULL(resourceProviderManager.get())
- ->publishResources(resources);
+ return resourceProviderManager.publishResources(resources);
}
@@ -8792,26 +8754,6 @@ double Slave::_resources_revocable_percent(const string& name)
}
-void Slave::initializeResourceProviderManager(
- const Flags& flags,
- const SlaveID& slaveId)
-{
- // To simplify reasoning about lifetimes we do not allow
- // reinitialization of the resource provider manager.
- if (resourceProviderManager.get() != nullptr) {
- return;
- }
-
- resourceProviderManager.reset(new ResourceProviderManager());
-
- if (capabilities.resourceProvider) {
- // Start listening for messages from the resource provider manager.
- resourceProviderManager->messages().get().onAny(
- defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
- }
-}
-
-
Framework::Framework(
Slave* _slave,
const Flags& slaveFlags,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c3866c6..c35996b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,10 +711,6 @@ private:
const SlaveInfo& previous,
const SlaveInfo& current) const;
- void initializeResourceProviderManager(
- const Flags& flags,
- const SlaveID& slaveId);
-
protobuf::master::Capabilities requiredMasterCapabilities;
const Flags flags;
@@ -816,7 +812,7 @@ private:
// (allocated and oversubscribable) resources.
Option<Resources> oversubscribedResources;
- process::Owned<ResourceProviderManager> resourceProviderManager;
+ ResourceProviderManager resourceProviderManager;
process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
// Local resource providers known by the agent.
http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0de4e79..c52541b 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,17 +755,14 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
- Owned<MasterDetector> detector = master.get()->createDetector();
+ Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
- // For the agent's resource provider manager to start,
- // the agent needs to have been assigned an agent ID.
- Future<SlaveRegisteredMessage> slaveRegisteredMessage =
- FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+ Owned<MasterDetector> detector = master.get()->createDetector();
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
ASSERT_SOME(agent);
- AWAIT_READY(slaveRegisteredMessage);
+ AWAIT_READY(__recover);
// Wait for recovery to be complete.
Clock::pause();