You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/02 01:35:55 UTC

[7/9] mesos git commit: Added admitted resource providers to the manager's registry.

Added admitted resource providers to the manager's registry.

Review: https://reviews.apache.org/r/66545/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/65ed45fb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/65ed45fb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/65ed45fb

Branch: refs/heads/master
Commit: 65ed45fb039e7da64a58094aff67cf0189aa1f6d
Parents: 9a8cef0
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:19 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:19 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 45 ++++++++++++++++++++--
 src/resource_provider/registrar.cpp           | 10 +++++
 src/resource_provider/registry.proto          |  1 +
 src/tests/resource_provider_manager_tests.cpp | 28 ++++++++++----
 4 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index bc52741..5979480 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,6 +58,7 @@ using std::string;
 
 using mesos::internal::resource_provider::validation::call::validate;
 
+using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
@@ -184,6 +185,10 @@ private:
       const HttpConnection& http,
       const Call::Subscribe& subscribe);
 
+  void _subscribe(
+      const Future<bool>& admitResourceProvider,
+      Owned<ResourceProvider> resourceProvider);
+
   void updateOperationStatus(
       ResourceProvider* resourceProvider,
       const Call::UpdateOperationStatus& update);
@@ -657,17 +662,53 @@ void ResourceProviderManagerProcess::subscribe(
   Owned<ResourceProvider> resourceProvider(
       new ResourceProvider(resourceProviderInfo, http));
 
+  Future<bool> admitResourceProvider;
+
   if (!resourceProviderInfo.has_id()) {
     // The resource provider is subscribing for the first time.
     resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
+
+    // If we are handing out a new `ResourceProviderID` persist the ID by
+    // triggering a `AdmitResourceProvider` operation on the registrar.
+    admitResourceProvider =
+      registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>(
+          new AdmitResourceProvider(resourceProvider->info.id())));
   } else {
     // TODO(chhsiao): The resource provider is resubscribing after being
     // restarted or an agent failover. The 'ResourceProviderInfo' might
     // have been updated, but its type and name should remain the same.
     // We should checkpoint its 'type', 'name' and ID, then check if the
     // resubscribption is consistent with the checkpointed record.
+
+    // If the resource provider is known we do not need to admit it
+    // again, and the registrar operation implicitly succeeded.
+    admitResourceProvider = true;
   }
 
+  admitResourceProvider.onAny(defer(
+      self(),
+      &ResourceProviderManagerProcess::_subscribe,
+      lambda::_1,
+      std::move(resourceProvider)));
+}
+
+
+void ResourceProviderManagerProcess::_subscribe(
+    const Future<bool>& admitResourceProvider,
+    Owned<ResourceProvider> resourceProvider)
+{
+  if (!admitResourceProvider.isReady()) {
+    LOG(INFO)
+      << "Not subscribing resource provider " << resourceProvider->info.id()
+      << " as registry update did not succeed: " << admitResourceProvider;
+
+    return;
+  }
+
+  CHECK(admitResourceProvider.get())
+    << "Could not admit resource provider " << resourceProvider->info.id()
+    << " as registry update was rejected";
+
   const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
 
   Event event;
@@ -681,7 +722,7 @@ void ResourceProviderManagerProcess::subscribe(
     return;
   }
 
-  http.closed()
+  resourceProvider->http.closed()
     .onAny(defer(self(), [=](const Future<Nothing>& future) {
       // Iff the remote side closes the HTTP connection, the future will be
       // ready. We will remove the resource provider in that case.
@@ -716,8 +757,6 @@ void ResourceProviderManagerProcess::subscribe(
     resourceProviders.known.put(
         resourceProviderId,
         std::move(resourceProvider_));
-
-    // TODO(bbannier): Persist this information in the registry.
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 6cc4625..a855a2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -115,6 +115,15 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry)
     return Error("Resource provider already admitted");
   }
 
+  if (std::find_if(
+          registry->removed_resource_providers().begin(),
+          registry->removed_resource_providers().end(),
+          [this](const ResourceProvider& resourceProvider) {
+            return resourceProvider.id() == this->id;
+          }) != registry->removed_resource_providers().end()) {
+    return Error("Resource provider was removed");
+  }
+
   ResourceProvider resourceProvider;
   resourceProvider.mutable_id()->CopyFrom(id);
 
@@ -141,6 +150,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
     return Error("Attempted to remove an unknown resource provider");
   }
 
+  registry->add_removed_resource_providers()->CopyFrom(*pos);
   registry->mutable_resource_providers()->erase(pos);
 
   return true; // Mutation.

http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
index 14bd433..491263e 100644
--- a/src/resource_provider/registry.proto
+++ b/src/resource_provider/registry.proto
@@ -35,4 +35,5 @@ message ResourceProvider {
 // A top level object that is managed by the Registrar and persisted.
 message Registry {
   repeated ResourceProvider resource_providers = 1;
+  repeated ResourceProvider removed_resource_providers = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index eb8e4fc..8c364a1 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -843,17 +843,24 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
 
   AWAIT_READY(registrar.get()->recover());
 
-  Future<bool> admitResourceProvider =
+  Future<bool> admitResourceProvider1 =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider);
-  EXPECT_TRUE(admitResourceProvider.get());
+  AWAIT_READY(admitResourceProvider1);
+  EXPECT_TRUE(admitResourceProvider1.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
+
+  // A removed resource provider cannot be admitted again.
+  Future<bool> admitResourceProvider2 =
+    registrar.get()->apply(Owned<Registrar::Operation>(
+        new AdmitResourceProvider(resourceProviderId)));
+  AWAIT_READY(admitResourceProvider2);
+  EXPECT_FALSE(admitResourceProvider2.get());
 }
 
 
@@ -879,19 +886,24 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  AWAIT_READY(masterRegistrar.recover(masterInfo));
-
-  Future<bool> admitResourceProvider =
+  Future<bool> admitResourceProvider1 =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider);
-  EXPECT_TRUE(admitResourceProvider.get());
+  AWAIT_READY(admitResourceProvider1);
+  EXPECT_TRUE(admitResourceProvider1.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
+
+  // A removed resource provider cannot be admitted again.
+  Future<bool> admitResourceProvider2 =
+    registrar.get()->apply(Owned<Registrar::Operation>(
+        new AdmitResourceProvider(resourceProviderId)));
+  AWAIT_READY(admitResourceProvider2);
+  EXPECT_FALSE(admitResourceProvider2.get());
 }