You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/02 01:35:55 UTC
[7/9] mesos git commit: Added admitted resource providers to the
manager's registry.
Added admitted resource providers to the manager's registry.
Review: https://reviews.apache.org/r/66545/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/65ed45fb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/65ed45fb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/65ed45fb
Branch: refs/heads/master
Commit: 65ed45fb039e7da64a58094aff67cf0189aa1f6d
Parents: 9a8cef0
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:19 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:19 2018 -0700
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 45 ++++++++++++++++++++--
src/resource_provider/registrar.cpp | 10 +++++
src/resource_provider/registry.proto | 1 +
src/tests/resource_provider_manager_tests.cpp | 28 ++++++++++----
4 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index bc52741..5979480 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,6 +58,7 @@ using std::string;
using mesos::internal::resource_provider::validation::call::validate;
+using mesos::resource_provider::AdmitResourceProvider;
using mesos::resource_provider::Call;
using mesos::resource_provider::Event;
using mesos::resource_provider::Registrar;
@@ -184,6 +185,10 @@ private:
const HttpConnection& http,
const Call::Subscribe& subscribe);
+ void _subscribe(
+ const Future<bool>& admitResourceProvider,
+ Owned<ResourceProvider> resourceProvider);
+
void updateOperationStatus(
ResourceProvider* resourceProvider,
const Call::UpdateOperationStatus& update);
@@ -657,17 +662,53 @@ void ResourceProviderManagerProcess::subscribe(
Owned<ResourceProvider> resourceProvider(
new ResourceProvider(resourceProviderInfo, http));
+ Future<bool> admitResourceProvider;
+
if (!resourceProviderInfo.has_id()) {
// The resource provider is subscribing for the first time.
resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
+
+ // If we are handing out a new `ResourceProviderID` persist the ID by
+ // triggering a `AdmitResourceProvider` operation on the registrar.
+ admitResourceProvider =
+ registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>(
+ new AdmitResourceProvider(resourceProvider->info.id())));
} else {
// TODO(chhsiao): The resource provider is resubscribing after being
// restarted or an agent failover. The 'ResourceProviderInfo' might
// have been updated, but its type and name should remain the same.
// We should checkpoint its 'type', 'name' and ID, then check if the
// resubscribption is consistent with the checkpointed record.
+
+ // If the resource provider is known we do not need to admit it
+ // again, and the registrar operation implicitly succeeded.
+ admitResourceProvider = true;
}
+ admitResourceProvider.onAny(defer(
+ self(),
+ &ResourceProviderManagerProcess::_subscribe,
+ lambda::_1,
+ std::move(resourceProvider)));
+}
+
+
+void ResourceProviderManagerProcess::_subscribe(
+ const Future<bool>& admitResourceProvider,
+ Owned<ResourceProvider> resourceProvider)
+{
+ if (!admitResourceProvider.isReady()) {
+ LOG(INFO)
+ << "Not subscribing resource provider " << resourceProvider->info.id()
+ << " as registry update did not succeed: " << admitResourceProvider;
+
+ return;
+ }
+
+ CHECK(admitResourceProvider.get())
+ << "Could not admit resource provider " << resourceProvider->info.id()
+ << " as registry update was rejected";
+
const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
Event event;
@@ -681,7 +722,7 @@ void ResourceProviderManagerProcess::subscribe(
return;
}
- http.closed()
+ resourceProvider->http.closed()
.onAny(defer(self(), [=](const Future<Nothing>& future) {
// Iff the remote side closes the HTTP connection, the future will be
// ready. We will remove the resource provider in that case.
@@ -716,8 +757,6 @@ void ResourceProviderManagerProcess::subscribe(
resourceProviders.known.put(
resourceProviderId,
std::move(resourceProvider_));
-
- // TODO(bbannier): Persist this information in the registry.
}
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 6cc4625..a855a2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -115,6 +115,15 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry)
return Error("Resource provider already admitted");
}
+ if (std::find_if(
+ registry->removed_resource_providers().begin(),
+ registry->removed_resource_providers().end(),
+ [this](const ResourceProvider& resourceProvider) {
+ return resourceProvider.id() == this->id;
+ }) != registry->removed_resource_providers().end()) {
+ return Error("Resource provider was removed");
+ }
+
ResourceProvider resourceProvider;
resourceProvider.mutable_id()->CopyFrom(id);
@@ -141,6 +150,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
return Error("Attempted to remove an unknown resource provider");
}
+ registry->add_removed_resource_providers()->CopyFrom(*pos);
registry->mutable_resource_providers()->erase(pos);
return true; // Mutation.
http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
index 14bd433..491263e 100644
--- a/src/resource_provider/registry.proto
+++ b/src/resource_provider/registry.proto
@@ -35,4 +35,5 @@ message ResourceProvider {
// A top level object that is managed by the Registrar and persisted.
message Registry {
repeated ResourceProvider resource_providers = 1;
+ repeated ResourceProvider removed_resource_providers = 2;
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index eb8e4fc..8c364a1 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -843,17 +843,24 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
AWAIT_READY(registrar.get()->recover());
- Future<bool> admitResourceProvider =
+ Future<bool> admitResourceProvider1 =
registrar.get()->apply(Owned<Registrar::Operation>(
new AdmitResourceProvider(resourceProviderId)));
- AWAIT_READY(admitResourceProvider);
- EXPECT_TRUE(admitResourceProvider.get());
+ AWAIT_READY(admitResourceProvider1);
+ EXPECT_TRUE(admitResourceProvider1.get());
Future<bool> removeResourceProvider =
registrar.get()->apply(Owned<Registrar::Operation>(
new RemoveResourceProvider(resourceProviderId)));
AWAIT_READY(removeResourceProvider);
EXPECT_TRUE(removeResourceProvider.get());
+
+ // A removed resource provider cannot be admitted again.
+ Future<bool> admitResourceProvider2 =
+ registrar.get()->apply(Owned<Registrar::Operation>(
+ new AdmitResourceProvider(resourceProviderId)));
+ AWAIT_READY(admitResourceProvider2);
+ EXPECT_FALSE(admitResourceProvider2.get());
}
@@ -879,19 +886,24 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
ASSERT_SOME(registrar);
ASSERT_NE(nullptr, registrar->get());
- AWAIT_READY(masterRegistrar.recover(masterInfo));
-
- Future<bool> admitResourceProvider =
+ Future<bool> admitResourceProvider1 =
registrar.get()->apply(Owned<Registrar::Operation>(
new AdmitResourceProvider(resourceProviderId)));
- AWAIT_READY(admitResourceProvider);
- EXPECT_TRUE(admitResourceProvider.get());
+ AWAIT_READY(admitResourceProvider1);
+ EXPECT_TRUE(admitResourceProvider1.get());
Future<bool> removeResourceProvider =
registrar.get()->apply(Owned<Registrar::Operation>(
new RemoveResourceProvider(resourceProviderId)));
AWAIT_READY(removeResourceProvider);
EXPECT_TRUE(removeResourceProvider.get());
+
+ // A removed resource provider cannot be admitted again.
+ Future<bool> admitResourceProvider2 =
+ registrar.get()->apply(Owned<Registrar::Operation>(
+ new AdmitResourceProvider(resourceProviderId)));
+ AWAIT_READY(admitResourceProvider2);
+ EXPECT_FALSE(admitResourceProvider2.get());
}