You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/04/25 10:58:18 UTC

[01/10] mesos git commit: Inserted explicit moves in libprocess.

Repository: mesos
Updated Branches:
  refs/heads/master b95d68d63 -> b1538db6b


Inserted explicit moves in libprocess.

In this patch we insert explicit moves in places where otherwise the
value would have been copied. In particular, an argument passed as an
rvalue will be available as an lvalue in a functions body, and an
explicit cast back to an rvalue is required to avoid creating a copy.
Similarly, when the type of the return value is different from the
type of the value being returned a temporary will be constructed to
convert to the correct type, and might be optimized due to RVO, but
nothing in (N)RVO would cause the conversion to use RVO; inserting an
explicit move will allow the compiler to select a potentially more
efficient way to construct the return value.

These instances were identified by a new move-related diagnostic added
recently to clang-7.

Review: https://reviews.apache.org/r/66779


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/defbe56f
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/defbe56f
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/defbe56f

Branch: refs/heads/master
Commit: defbe56fc99e383a6c4ed16668f1b8fb3e5a45bb
Parents: b95d68d
Author: Benjamin Bannier <bb...@apache.org>
Authored: Tue Apr 24 13:06:57 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:33:47 2018 +0200

----------------------------------------------------------------------
 3rdparty/libprocess/include/process/address.hpp | 3 ++-
 3rdparty/libprocess/src/memory_profiler.cpp     | 3 ++-
 2 files changed, 4 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/defbe56f/3rdparty/libprocess/include/process/address.hpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/include/process/address.hpp b/3rdparty/libprocess/include/process/address.hpp
index 7fb980e..e740e84 100644
--- a/3rdparty/libprocess/include/process/address.hpp
+++ b/3rdparty/libprocess/include/process/address.hpp
@@ -29,6 +29,7 @@
 #endif // __WINDOWS__
 
 #include <ostream>
+#include <utility>
 
 #include <boost/functional/hash.hpp>
 
@@ -524,7 +525,7 @@ inline Try<inet::Address> convert(Try<Address>&& address)
 template <>
 inline Try<Address> convert(Try<Address>&& address)
 {
-  return address;
+  return std::move(address);
 }
 
 } // namespace network {

http://git-wip-us.apache.org/repos/asf/mesos/blob/defbe56f/3rdparty/libprocess/src/memory_profiler.cpp
----------------------------------------------------------------------
diff --git a/3rdparty/libprocess/src/memory_profiler.cpp b/3rdparty/libprocess/src/memory_profiler.cpp
index 5d6e9dc..b765246 100644
--- a/3rdparty/libprocess/src/memory_profiler.cpp
+++ b/3rdparty/libprocess/src/memory_profiler.cpp
@@ -16,6 +16,7 @@
 #include <fstream>
 #include <sstream>
 #include <string>
+#include <utility>
 
 #include <process/delay.hpp>
 #include <process/future.hpp>
@@ -619,7 +620,7 @@ http::Response MemoryProfiler::DiskArtifact::asHttp() const
   response.headers["Content-Disposition"] =
     strings::format("attachment; filename=%s", path).get();
 
-  return response;
+  return std::move(response);
 }
 
 


[05/10] mesos git commit: Passed on registrar when constructing resource provider manager.

Posted by bb...@apache.org.
Passed on registrar when constructing resource provider manager.

In order to support recovering resource provider manager information
in the future, we need to adjust the construction of the manager to be
able to consume a registrar.

This patch lays the groundwork by adjusting interfaces and their
usage; we will make use of the passed on information in a following
patch.

Review: https://reviews.apache.org/r/66310/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e1e9a041
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e1e9a041
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e1e9a041

Branch: refs/heads/master
Commit: e1e9a041be55182fc4d683d973f0a2cd25919cf1
Parents: f6becd6
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:17 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:37:10 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 15 ++++++++----
 src/resource_provider/manager.hpp             |  5 +++-
 src/resource_provider/registrar.cpp           |  5 +++-
 src/slave/slave.cpp                           | 27 ++++++++++++++++++++-
 src/tests/resource_provider_manager_tests.cpp | 28 +++++++++++++++-------
 5 files changed, 63 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e1e9a041/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 68e1866..259a810 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,6 +58,7 @@ using mesos::internal::resource_provider::validation::call::validate;
 
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
+using mesos::resource_provider::Registrar;
 
 using process::Failure;
 using process::Future;
@@ -157,7 +158,7 @@ class ResourceProviderManagerProcess
   : public Process<ResourceProviderManagerProcess>
 {
 public:
-  ResourceProviderManagerProcess();
+  ResourceProviderManagerProcess(Owned<Registrar> _registrar);
 
   Future<http::Response> api(
       const http::Request& request,
@@ -212,9 +213,13 @@ private:
 };
 
 
-ResourceProviderManagerProcess::ResourceProviderManagerProcess()
+ResourceProviderManagerProcess::ResourceProviderManagerProcess(
+    Owned<Registrar> _registrar)
   : ProcessBase(process::ID::generate("resource-provider-manager")),
-    metrics(*this) {}
+    metrics(*this)
+{
+  CHECK_NOTNULL(_registrar.get());
+}
 
 
 Future<http::Response> ResourceProviderManagerProcess::api(
@@ -763,8 +768,8 @@ ResourceProviderManagerProcess::Metrics::~Metrics()
 }
 
 
-ResourceProviderManager::ResourceProviderManager()
-  : process(new ResourceProviderManagerProcess())
+ResourceProviderManager::ResourceProviderManager(Owned<Registrar> registrar)
+  : process(new ResourceProviderManagerProcess(std::move(registrar)))
 {
   spawn(CHECK_NOTNULL(process.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1e9a041/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index bc017fa..6c57956 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -26,6 +26,7 @@
 #include "messages/messages.hpp"
 
 #include "resource_provider/message.hpp"
+#include "resource_provider/registrar.hpp"
 
 namespace mesos {
 namespace internal {
@@ -37,7 +38,9 @@ class ResourceProviderManagerProcess;
 class ResourceProviderManager
 {
 public:
-  ResourceProviderManager();
+  ResourceProviderManager(
+      process::Owned<resource_provider::Registrar> registrar);
+
   ~ResourceProviderManager();
 
   ResourceProviderManager(

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1e9a041/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 53b403e..b151e2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -185,7 +185,10 @@ private:
 GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
   : ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
     storage(std::move(_storage)),
-    state(storage.get()) {}
+    state(storage.get())
+{
+  CHECK_NOTNULL(storage.get());
+}
 
 
 Future<Nothing> GenericRegistrarProcess::recover()

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1e9a041/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d313777..2632772 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -38,6 +38,9 @@
 
 #include <mesos/module/authenticatee.hpp>
 
+#include <mesos/state/leveldb.hpp>
+#include <mesos/state/in_memory.hpp>
+
 #include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
 
 #include <process/after.hpp>
@@ -8802,7 +8805,29 @@ void Slave::initializeResourceProviderManager(
     return;
   }
 
-  resourceProviderManager.reset(new ResourceProviderManager());
+  // The registrar uses LevelDB as underlying storage. Since LevelDB
+  // is currently not supported on Windows (see MESOS-5932), we fall
+  // back to in-memory storage there.
+  //
+  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
+#ifndef __WINDOWS__
+  Owned<mesos::state::Storage> storage(new mesos::state::LevelDBStorage(
+      paths::getResourceProviderRegistryPath(flags.work_dir, slaveId)));
+#else
+  LOG(WARNING)
+    << "Persisting resource provider manager state is not supported on Windows";
+  Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
+#endif // __WINDOWS__
+
+  Try<Owned<resource_provider::Registrar>> resourceProviderRegistrar =
+    resource_provider::Registrar::create(std::move(storage));
+
+  CHECK_SOME(resourceProviderRegistrar)
+    << "Could not construct resource provider registrar: "
+    << resourceProviderRegistrar.error();
+
+  resourceProviderManager.reset(
+      new ResourceProviderManager(std::move(resourceProviderRegistrar.get())));
 
   if (capabilities.resourceProvider) {
     // Start listening for messages from the resource provider manager.

http://git-wip-us.apache.org/repos/asf/mesos/blob/e1e9a041/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index ceb7854..1664073 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -67,6 +67,7 @@ using mesos::master::detector::MasterDetector;
 
 using mesos::state::InMemoryStorage;
 using mesos::state::State;
+using mesos::state::Storage;
 
 using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Registrar;
@@ -129,7 +130,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, NoContentType)
   request.method = "POST";
   request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -154,7 +156,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, ValidJsonButInvalidProtobuf)
   request.headers["Content-Type"] = APPLICATION_JSON;
   request.body = stringify(object);
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -177,7 +180,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, MalformedContent)
   request.headers["Content-Type"] = stringify(contentType);
   request.body = "MALFORMED_CONTENT";
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -223,7 +227,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType)
   request.headers["Content-Type"] = unknownMediaType;
   request.body = serialize(contentType, call);
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -235,7 +240,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -342,7 +348,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOperationStatus)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -460,7 +467,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesSuccess)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -567,7 +575,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesFailure)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -674,7 +683,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesDisconnected)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
   Option<http::Pipe::Reader> reader;


[06/10] mesos git commit: Added admitted resource providers to the manager's registry.

Posted by bb...@apache.org.
Added admitted resource providers to the manager's registry.

Review: https://reviews.apache.org/r/66545/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/23ff6622
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/23ff6622
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/23ff6622

Branch: refs/heads/master
Commit: 23ff6622fde41f40ee1d4ee914b5302c7b25de01
Parents: ac870c0
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:28 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:37:11 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 45 ++++++++++++++++++++--
 src/resource_provider/registrar.cpp           | 10 +++++
 src/resource_provider/registry.proto          |  1 +
 src/tests/resource_provider_manager_tests.cpp | 28 ++++++++++----
 4 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/23ff6622/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 32f23a7..1219bb7 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,6 +58,7 @@ using std::string;
 
 using mesos::internal::resource_provider::validation::call::validate;
 
+using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
@@ -184,6 +185,10 @@ private:
       const HttpConnection& http,
       const Call::Subscribe& subscribe);
 
+  void _subscribe(
+      const Future<bool>& admitResourceProvider,
+      Owned<ResourceProvider> resourceProvider);
+
   void updateOperationStatus(
       ResourceProvider* resourceProvider,
       const Call::UpdateOperationStatus& update);
@@ -657,17 +662,53 @@ void ResourceProviderManagerProcess::subscribe(
   Owned<ResourceProvider> resourceProvider(
       new ResourceProvider(resourceProviderInfo, http));
 
+  Future<bool> admitResourceProvider;
+
   if (!resourceProviderInfo.has_id()) {
     // The resource provider is subscribing for the first time.
     resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
+
+    // If we are handing out a new `ResourceProviderID` persist the ID by
+    // triggering a `AdmitResourceProvider` operation on the registrar.
+    admitResourceProvider =
+      registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>(
+          new AdmitResourceProvider(resourceProvider->info.id())));
   } else {
     // TODO(chhsiao): The resource provider is resubscribing after being
     // restarted or an agent failover. The 'ResourceProviderInfo' might
     // have been updated, but its type and name should remain the same.
     // We should checkpoint its 'type', 'name' and ID, then check if the
     // resubscribption is consistent with the checkpointed record.
+
+    // If the resource provider is known we do not need to admit it
+    // again, and the registrar operation implicitly succeeded.
+    admitResourceProvider = true;
   }
 
+  admitResourceProvider.onAny(defer(
+      self(),
+      &ResourceProviderManagerProcess::_subscribe,
+      lambda::_1,
+      std::move(resourceProvider)));
+}
+
+
+void ResourceProviderManagerProcess::_subscribe(
+    const Future<bool>& admitResourceProvider,
+    Owned<ResourceProvider> resourceProvider)
+{
+  if (!admitResourceProvider.isReady()) {
+    LOG(INFO)
+      << "Not subscribing resource provider " << resourceProvider->info.id()
+      << " as registry update did not succeed: " << admitResourceProvider;
+
+    return;
+  }
+
+  CHECK(admitResourceProvider.get())
+    << "Could not admit resource provider " << resourceProvider->info.id()
+    << " as registry update was rejected";
+
   const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
 
   Event event;
@@ -681,7 +722,7 @@ void ResourceProviderManagerProcess::subscribe(
     return;
   }
 
-  http.closed()
+  resourceProvider->http.closed()
     .onAny(defer(self(), [=](const Future<Nothing>& future) {
       // Iff the remote side closes the HTTP connection, the future will be
       // ready. We will remove the resource provider in that case.
@@ -716,8 +757,6 @@ void ResourceProviderManagerProcess::subscribe(
     resourceProviders.known.put(
         resourceProviderId,
         std::move(resourceProvider_));
-
-    // TODO(bbannier): Persist this information in the registry.
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/23ff6622/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index d7ec6a6..c506ec1 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -115,6 +115,15 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry)
     return Error("Resource provider already admitted");
   }
 
+  if (std::find_if(
+          registry->removed_resource_providers().begin(),
+          registry->removed_resource_providers().end(),
+          [this](const ResourceProvider& resourceProvider) {
+            return resourceProvider.id() == this->id;
+          }) != registry->removed_resource_providers().end()) {
+    return Error("Resource provider was removed");
+  }
+
   ResourceProvider resourceProvider;
   resourceProvider.mutable_id()->CopyFrom(id);
 
@@ -141,6 +150,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
     return Error("Attempted to remove an unknown resource provider");
   }
 
+  registry->add_removed_resource_providers()->CopyFrom(*pos);
   registry->mutable_resource_providers()->erase(pos);
 
   return true; // Mutation.

http://git-wip-us.apache.org/repos/asf/mesos/blob/23ff6622/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
index 14bd433..491263e 100644
--- a/src/resource_provider/registry.proto
+++ b/src/resource_provider/registry.proto
@@ -35,4 +35,5 @@ message ResourceProvider {
 // A top level object that is managed by the Registrar and persisted.
 message Registry {
   repeated ResourceProvider resource_providers = 1;
+  repeated ResourceProvider removed_resource_providers = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/23ff6622/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index eb8e4fc..8c364a1 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -843,17 +843,24 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
 
   AWAIT_READY(registrar.get()->recover());
 
-  Future<bool> admitResourceProvider =
+  Future<bool> admitResourceProvider1 =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider);
-  EXPECT_TRUE(admitResourceProvider.get());
+  AWAIT_READY(admitResourceProvider1);
+  EXPECT_TRUE(admitResourceProvider1.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
+
+  // A removed resource provider cannot be admitted again.
+  Future<bool> admitResourceProvider2 =
+    registrar.get()->apply(Owned<Registrar::Operation>(
+        new AdmitResourceProvider(resourceProviderId)));
+  AWAIT_READY(admitResourceProvider2);
+  EXPECT_FALSE(admitResourceProvider2.get());
 }
 
 
@@ -879,19 +886,24 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  AWAIT_READY(masterRegistrar.recover(masterInfo));
-
-  Future<bool> admitResourceProvider =
+  Future<bool> admitResourceProvider1 =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider);
-  EXPECT_TRUE(admitResourceProvider.get());
+  AWAIT_READY(admitResourceProvider1);
+  EXPECT_TRUE(admitResourceProvider1.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
+
+  // A removed resource provider cannot be admitted again.
+  Future<bool> admitResourceProvider2 =
+    registrar.get()->apply(Owned<Registrar::Operation>(
+        new AdmitResourceProvider(resourceProviderId)));
+  AWAIT_READY(admitResourceProvider2);
+  EXPECT_FALSE(admitResourceProvider2.get());
 }
 
 


[10/10] mesos git commit: Remembered recovered and subscribed providers in ephemeral state.

Posted by bb...@apache.org.
Remembered recovered and subscribed providers in ephemeral state.

This patch adds a data structure to bookkeep subscribed and recovered
resource providers in the ephemeral state of the resource provider
manager.

Review: https://reviews.apache.org/r/66544/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ac870c00
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ac870c00
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ac870c00

Branch: refs/heads/master
Commit: ac870c00ab11ccc14b9c872c78130ef4d568c0ee
Parents: bfcf557
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:25 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:37:11 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ac870c00/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 6393e7a..32f23a7 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -208,6 +208,10 @@ private:
   struct ResourceProviders
   {
     hashmap<ResourceProviderID, Owned<ResourceProvider>> subscribed;
+    hashmap<
+        ResourceProviderID,
+        mesos::resource_provider::registry::ResourceProvider>
+      known;
   } resourceProviders;
 
   struct Metrics
@@ -253,6 +257,13 @@ void ResourceProviderManagerProcess::initialize()
 Future<Nothing> ResourceProviderManagerProcess::recover(
     const mesos::resource_provider::registry::Registry& registry)
 {
+  foreach (
+      const mesos::resource_provider::registry::ResourceProvider&
+        resourceProvider,
+      registry.resource_providers()) {
+    resourceProviders.known.put(resourceProvider.id(), resourceProvider);
+  }
+
   recovered.set(Nothing());
 
   return Nothing();
@@ -697,6 +708,17 @@ void ResourceProviderManagerProcess::subscribe(
   resourceProviders.subscribed.put(
       resourceProviderId,
       std::move(resourceProvider));
+
+  if (!resourceProviders.known.contains(resourceProviderId)) {
+    mesos::resource_provider::registry::ResourceProvider resourceProvider_;
+    resourceProvider_.mutable_id()->CopyFrom(resourceProviderId);
+
+    resourceProviders.known.put(
+        resourceProviderId,
+        std::move(resourceProvider_));
+
+    // TODO(bbannier): Persist this information in the registry.
+  }
 }
 
 


[08/10] mesos git commit: Prevent resubscription of resource providers with unknown IDs.

Posted by bb...@apache.org.
Prevent resubscription of resource providers with unknown IDs.

This patch adds a check to the resource provider manager's subscribe
functionality making sure that any ID sent by a resubscribing resource
provider corresponds to some previously known resource provider.

This not only serves as convenient validation of user-provided data,
but also makes sure that the internal state of the resource provider
manager remains consistent.

Review: https://reviews.apache.org/r/66546/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/1679d4c8
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/1679d4c8
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/1679d4c8

Branch: refs/heads/master
Commit: 1679d4c85c0bd095e2dafaf7a95adde258db8179
Parents: 23ff662
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:31 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:37:11 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 13 ++++-
 src/tests/resource_provider_manager_tests.cpp | 61 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/1679d4c8/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 1219bb7..05911de 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -678,7 +678,18 @@ void ResourceProviderManagerProcess::subscribe(
     // restarted or an agent failover. The 'ResourceProviderInfo' might
     // have been updated, but its type and name should remain the same.
     // We should checkpoint its 'type', 'name' and ID, then check if the
-    // resubscribption is consistent with the checkpointed record.
+    // resubscription is consistent with the checkpointed record.
+
+    const ResourceProviderID& resourceProviderId = resourceProviderInfo.id();
+
+    if (!resourceProviders.known.contains(resourceProviderId)) {
+      LOG(INFO)
+        << "Dropping resubscription attempt of resource provider with ID "
+        << resourceProviderId
+        << " since it is unknown";
+
+      return;
+    }
 
     // If the resource provider is known we do not need to admit it
     // again, and the registrar operation implicitly succeeded.

http://git-wip-us.apache.org/repos/asf/mesos/blob/1679d4c8/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 8c364a1..1ee4dc3 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -1150,6 +1150,67 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
 }
 
 
+// Test that when a resource provider attempts to resubscribe with an
+// unknown ID it is not admitted but disconnected.
+TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeUnknownID)
+{
+  Clock::pause();
+
+  // Start master and agent.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // For the agent's resource provider manager to start,
+  // the agent needs to have been assigned an agent ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(agent);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  mesos::v1::ResourceProviderID resourceProviderId;
+  resourceProviderId.set_value(id::UUID::random().toString());
+
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.mutable_id()->CopyFrom(resourceProviderId);
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  Owned<v1::MockResourceProvider> resourceProvider(
+      new v1::MockResourceProvider(resourceProviderInfo));
+
+  // We explicitly reset the resource provider after the expected
+  // disconnect to prevent it from resubscribing indefinitely.
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*resourceProvider, disconnected())
+    .WillOnce(DoAll(
+        Invoke([&resourceProvider]() { resourceProvider.reset(); }),
+        FutureSatisfy(&disconnected)));
+
+  // Start and register a resource provider.
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(agent.get()->pid));
+
+  const ContentType contentType = GetParam();
+
+  resourceProvider->start(
+      endpointDetector,
+      contentType,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(disconnected);
+}
+
+
 // This test verifies that a disconnected resource provider will
 // result in an `UpdateSlaveMessage` to be sent to the master and the
 // total resources of the disconnected resource provider will be


[07/10] mesos git commit: Removed redundant master flags in resource provider tests.

Posted by bb...@apache.org.
Removed redundant master flags in resource provider tests.

Review: https://reviews.apache.org/r/66780


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1538db6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1538db6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1538db6

Branch: refs/heads/master
Commit: b1538db6b9d12633195e0d5a0eff48246ea0d4ad
Parents: 1679d4c
Author: Benjamin Bannier <bb...@apache.org>
Authored: Tue Apr 24 16:27:15 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:37:11 2018 +0200

----------------------------------------------------------------------
 src/tests/resource_provider_manager_tests.cpp | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b1538db6/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1ee4dc3..889cb32 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -914,8 +914,7 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
 {
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1050,8 +1049,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
   Clock::pause();
 
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1305,8 +1303,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
   Clock::pause();
 
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1382,8 +1379,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
   Clock::pause();
 
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();


[03/10] mesos git commit: Externalized creation of resource provider manager backing storage.

Posted by bb...@apache.org.
Externalized creation of resource provider manager backing storage.

This patch changes the way the storage backing an agent's resource
provider registrar is created: while before we created it implicitly
when constructing the registrar, we now consume storage passed on
construction.

Being able to explicitly inject the used storage simplifies testing.

Review: https://reviews.apache.org/r/66309/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6f6413b6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6f6413b6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6f6413b6

Branch: refs/heads/master
Commit: 6f6413b618b4d7aec7c8f8e6fa9e3542f1af2b9c
Parents: 0424a66
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:12 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:34:37 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/registrar.cpp           | 49 ++++------------------
 src/resource_provider/registrar.hpp           | 15 ++++---
 src/tests/resource_provider_manager_tests.cpp | 27 +-----------
 3 files changed, 16 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6f6413b6/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 92ef9ae..dbb55dd 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -27,10 +27,6 @@
 
 #include <mesos/state/in_memory.hpp>
 
-#ifndef __WINDOWS__
-#include <mesos/state/leveldb.hpp>
-#endif // __WINDOWS__
-
 #include <mesos/state/protobuf.hpp>
 
 #include <process/defer.hpp>
@@ -53,12 +49,6 @@ using std::string;
 using mesos::resource_provider::registry::Registry;
 using mesos::resource_provider::registry::ResourceProvider;
 
-using mesos::state::InMemoryStorage;
-
-#ifndef __WINDOWS__
-using mesos::state::LevelDBStorage;
-#endif // __WINDOWS__
-
 using mesos::state::Storage;
 
 using mesos::state::protobuf::Variable;
@@ -96,11 +86,9 @@ bool Registrar::Operation::set()
 }
 
 
-Try<Owned<Registrar>> Registrar::create(
-    const slave::Flags& slaveFlags,
-    const SlaveID& slaveId)
+Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 {
-  return new AgentRegistrar(slaveFlags, slaveId);
+  return new AgentRegistrar(std::move(storage));
 }
 
 
@@ -160,7 +148,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
 class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
 {
 public:
-  AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
+  AgentRegistrarProcess(Owned<Storage> storage);
 
   Future<Nothing> recover();
 
@@ -191,33 +179,12 @@ private:
   deque<Owned<Registrar::Operation>> operations;
 
   bool updating = false;
-
-  static Owned<Storage> createStorage(const std::string& path);
 };
 
 
-Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
-{
-  // The registrar uses LevelDB as underlying storage. Since LevelDB
-  // is currently not supported on Windows (see MESOS-5932), we fall
-  // back to in-memory storage there.
-  //
-  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
-#ifndef __WINDOWS__
-  return Owned<Storage>(new LevelDBStorage(path));
-#else
-  LOG(WARNING)
-    << "Persisting resource provider manager state is not supported on Windows";
-  return Owned<Storage>(new InMemoryStorage());
-#endif // __WINDOWS__
-}
-
-
-AgentRegistrarProcess::AgentRegistrarProcess(
-    const slave::Flags& flags, const SlaveID& slaveId)
+AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    storage(createStorage(slave::paths::getResourceProviderRegistryPath(
-        flags.work_dir, slaveId))),
+    storage(std::move(_storage)),
     state(storage.get()) {}
 
 
@@ -355,10 +322,8 @@ void AgentRegistrarProcess::_update(
 }
 
 
-AgentRegistrar::AgentRegistrar(
-    const slave::Flags& slaveFlags,
-    const SlaveID& slaveId)
-  : process(new AgentRegistrarProcess(slaveFlags, slaveId))
+AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
+  : process(new AgentRegistrarProcess(std::move(storage)))
 {
   process::spawn(process.get(), false);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/6f6413b6/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 39f45b0..34cb166 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -19,6 +19,8 @@
 
 #include <memory>
 
+#include <mesos/state/storage.hpp>
+
 #include <process/future.hpp>
 #include <process/owned.hpp>
 
@@ -64,14 +66,13 @@ public:
     bool success = false;
   };
 
-  // Create a registry on top of a master's persistent state.
+  // Create a registry on top of generic storage.
   static Try<process::Owned<Registrar>> create(
-      mesos::internal::master::Registrar* registrar);
+      process::Owned<state::Storage> storage);
 
-  // Create a registry on top of an agent's persistent state.
+  // Create a registry on top of a master's persistent state.
   static Try<process::Owned<Registrar>> create(
-      const mesos::internal::slave::Flags& slaveFlags,
-      const SlaveID& slaveId);
+      mesos::internal::master::Registrar* registrar);
 
   virtual ~Registrar() = default;
 
@@ -110,9 +111,7 @@ class AgentRegistrarProcess;
 class AgentRegistrar : public Registrar
 {
 public:
-  AgentRegistrar(
-      const mesos::internal::slave::Flags& slaveFlags,
-      const SlaveID& slaveId);
+  AgentRegistrar(process::Owned<state::Storage> storage);
 
   ~AgentRegistrar() override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6f6413b6/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0de4e79..72e8122 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -825,31 +825,8 @@ TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
   ResourceProviderID resourceProviderId;
   resourceProviderId.set_value("foo");
 
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  const slave::Flags flags = CreateSlaveFlags();
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
-
-  Future<UpdateSlaveMessage> updateSlaveMessage =
-    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
-
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
-  ASSERT_SOME(slave);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  // The agent will send `UpdateSlaveMessage` after it has created its
-  // meta directories. Await the message to make sure the agent
-  // registrar can create its store in the meta hierarchy.
-  AWAIT_READY(updateSlaveMessage);
-
-  Try<Owned<Registrar>> registrar =
-    Registrar::create(flags, slaveRegisteredMessage->slave_id());
+  Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
+  Try<Owned<Registrar>> registrar = Registrar::create(std::move(storage));
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());


[02/10] mesos git commit: Delayed construction of the agent's resource provider manager.

Posted by bb...@apache.org.
Delayed construction of the agent's resource provider manager.

By delaying the construction of the agent's resource provider manager
we prepare for a following patch introducing a dependency of the
resource provider manager on the agent's ID.

Depending on whether the agent was able to recover an agent ID from
its log or still needs to obtain one in a first registration with the
master, we can only construct the resource provider manager after
different points in the initialization of the agent. To capture the
common code we introduce a helper function performing the necessary
steps.

Review: https://reviews.apache.org/r/66308/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0424a662
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0424a662
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0424a662

Branch: refs/heads/master
Commit: 0424a6623d08440d8dbe5aff5ec2f18df7b93e24
Parents: defbe56
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:07 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:34:09 2018 +0200

----------------------------------------------------------------------
 src/slave/slave.cpp                           | 94 +++++++++++++++++-----
 src/slave/slave.hpp                           |  6 +-
 src/tests/resource_provider_manager_tests.cpp |  9 ++-
 3 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0424a662/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d0ff5f8..d313777 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,6 +30,8 @@
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/secret_generator.hpp>
@@ -772,15 +774,20 @@ void Slave::initialize()
           logRequest(request);
           return http.executor(request, principal);
         });
+  route(
+      "/api/v1/resource_provider",
+      READWRITE_HTTP_AUTHENTICATION_REALM,
+      Http::RESOURCE_PROVIDER_HELP(),
+      [this](const http::Request& request, const Option<Principal>& principal)
+        -> Future<http::Response> {
+        logRequest(request);
+
+        if (resourceProviderManager.get() == nullptr) {
+          return http::ServiceUnavailable();
+        }
 
-  route("/api/v1/resource_provider",
-        READWRITE_HTTP_AUTHENTICATION_REALM,
-        Http::RESOURCE_PROVIDER_HELP(),
-        [this](const http::Request& request,
-               const Option<Principal>& principal) {
-          logRequest(request);
-          return resourceProviderManager.api(request, principal);
-        });
+        return resourceProviderManager->api(request, principal);
+      });
 
   // TODO(ijimenez): Remove this endpoint at the end of the
   // deprecation cycle on 0.26.
@@ -1502,6 +1509,8 @@ void Slave::registered(
 
       CHECK_SOME(state::checkpoint(path, info));
 
+      initializeResourceProviderManager(flags, info.id());
+
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
@@ -4344,7 +4353,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
   }
 
   if (resourceProviderId.isSome()) {
-    resourceProviderManager.applyOperation(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
     return;
   }
 
@@ -4417,7 +4426,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
   }
 
   if (containsResourceProviderOperations) {
-    resourceProviderManager.reconcileOperations(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
   }
 }
 
@@ -4549,7 +4558,19 @@ void Slave::operationStatusAcknowledgement(
 {
   Operation* operation = getOperation(acknowledgement.operation_uuid());
   if (operation != nullptr) {
-    resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
+    // If the operation was on resource provider resources forward the
+    // acknowledgement to the resource provider manager as well.
+    Result<ResourceProviderID> resourceProviderId =
+      getResourceProviderId(operation->info());
+
+    CHECK(!resourceProviderId.isError())
+      << "Could not determine resource provider of operation " << operation
+      << ": " << resourceProviderId.error();
+
+    if (resourceProviderId.isSome()) {
+      CHECK_NOTNULL(resourceProviderManager.get())
+        ->acknowledgeOperationStatus(acknowledgement);
+    }
 
     CHECK(operation->statuses_size() > 0);
     if (protobuf::isTerminalState(
@@ -7319,10 +7340,8 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
-    if (capabilities.resourceProvider) {
-      // Start listening for messages from the resource provider manager.
-      resourceProviderManager.messages().get().onAny(
-          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+    if (info.has_id()) {
+      initializeResourceProviderManager(flags, info.id());
     }
 
     // Forward oversubscribed resources.
@@ -7600,7 +7619,7 @@ void Slave::handleResourceProviderMessage(
                << (message.isFailed() ? message.failure() : "future discarded");
 
     // Wait for the next message.
-    resourceProviderManager.messages().get()
+    CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
       .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 
     return;
@@ -7859,7 +7878,7 @@ void Slave::handleResourceProviderMessage(
   }
 
   // Wait for the next message.
-  resourceProviderManager.messages().get()
+  CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
     .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 }
 
@@ -8114,6 +8133,24 @@ void Slave::apply(Operation* operation)
 Future<Nothing> Slave::publishResources(
     const Option<Resources>& additionalResources)
 {
+  // If the resource provider manager has not been created yet no resource
+  // providers have been added and we do not need to publish anything.
+  if (resourceProviderManager == nullptr) {
+    // We check whether the passed additional resources are compatible
+    // with the expectation that no resource provider resources are in
+    // use, yet. This is not an exhaustive consistency check.
+    if (additionalResources.isSome()) {
+      foreach (const Resource& resource, additionalResources.get()) {
+        CHECK(!resource.has_provider_id())
+          << "Cannot publish resource provider resources "
+          << additionalResources.get()
+          << " until resource providers have subscribed";
+      }
+    }
+
+    return Nothing();
+  }
+
   Resources resources;
 
   // NOTE: For resources providers that serve quantity-based resources
@@ -8134,7 +8171,8 @@ Future<Nothing> Slave::publishResources(
     resources += additionalResources.get();
   }
 
-  return resourceProviderManager.publishResources(resources);
+  return CHECK_NOTNULL(resourceProviderManager.get())
+    ->publishResources(resources);
 }
 
 
@@ -8754,6 +8792,26 @@ double Slave::_resources_revocable_percent(const string& name)
 }
 
 
+void Slave::initializeResourceProviderManager(
+    const Flags& flags,
+    const SlaveID& slaveId)
+{
+  // To simplify reasoning about lifetimes we do not allow
+  // reinitialization of the resource provider manager.
+  if (resourceProviderManager.get() != nullptr) {
+    return;
+  }
+
+  resourceProviderManager.reset(new ResourceProviderManager());
+
+  if (capabilities.resourceProvider) {
+    // Start listening for messages from the resource provider manager.
+    resourceProviderManager->messages().get().onAny(
+        defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+  }
+}
+
+
 Framework::Framework(
     Slave* _slave,
     const Flags& slaveFlags,

http://git-wip-us.apache.org/repos/asf/mesos/blob/0424a662/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c35996b..c3866c6 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,6 +711,10 @@ private:
       const SlaveInfo& previous,
       const SlaveInfo& current) const;
 
+  void initializeResourceProviderManager(
+      const Flags& flags,
+      const SlaveID& slaveId);
+
   protobuf::master::Capabilities requiredMasterCapabilities;
 
   const Flags flags;
@@ -812,7 +816,7 @@ private:
   // (allocated and oversubscribable) resources.
   Option<Resources> oversubscribedResources;
 
-  ResourceProviderManager resourceProviderManager;
+  process::Owned<ResourceProviderManager> resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
 
   // Local resource providers known by the agent.

http://git-wip-us.apache.org/repos/asf/mesos/blob/0424a662/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index c52541b..0de4e79 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,14 +755,17 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
-
   Owned<MasterDetector> detector = master.get()->createDetector();
 
+  // For the agent's resource provider manager to start,
+  // the agent needs to have been assigned an agent ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
   Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
   ASSERT_SOME(agent);
 
-  AWAIT_READY(__recover);
+  AWAIT_READY(slaveRegisteredMessage);
 
   // Wait for recovery to be complete.
   Clock::pause();


[09/10] mesos git commit: Set up recovery code paths of resource provider manager.

Posted by bb...@apache.org.
Set up recovery code paths of resource provider manager.

This patch adjusts the control flow of the resource provider manager
so that we can in the future make use of persisted resource provider
information. While this patch sets up the needed flow, it does not
implement recovery logic, yet.

Review: https://reviews.apache.org/r/66311/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/bfcf5571
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/bfcf5571
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/bfcf5571

Branch: refs/heads/master
Commit: bfcf5571869598a2e6d75550013fdaefa57dd6cb
Parents: e1e9a04
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:22 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:37:11 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 323 ++++++++++++---------
 src/resource_provider/registrar.cpp           |  91 +++---
 src/resource_provider/registrar.hpp           |  18 +-
 src/tests/resource_provider_manager_tests.cpp |  15 +-
 4 files changed, 260 insertions(+), 187 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/bfcf5571/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 259a810..6393e7a 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -36,6 +36,7 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/uuid.hpp>
 
@@ -47,6 +48,7 @@
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
+#include "resource_provider/registry.hpp"
 #include "resource_provider/validation.hpp"
 
 namespace http = process::http;
@@ -60,6 +62,8 @@ using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
 
+using mesos::resource_provider::registry::Registry;
+
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -76,10 +80,10 @@ using process::wait;
 
 using process::http::Accepted;
 using process::http::BadRequest;
-using process::http::OK;
 using process::http::MethodNotAllowed;
 using process::http::NotAcceptable;
 using process::http::NotImplemented;
+using process::http::OK;
 using process::http::Pipe;
 using process::http::UnsupportedMediaType;
 
@@ -192,6 +196,11 @@ private:
       ResourceProvider* resourceProvider,
       const Call::UpdatePublishResourcesStatus& update);
 
+  Future<Nothing> recover(
+      const mesos::resource_provider::registry::Registry& registry);
+
+  void initialize() override;
+
   ResourceProviderID newResourceProviderId();
 
   double gaugeSubscribed();
@@ -209,6 +218,9 @@ private:
     Gauge subscribed;
   };
 
+  Owned<Registrar> registrar;
+  Promise<Nothing> recovered;
+
   Metrics metrics;
 };
 
@@ -216,152 +228,191 @@ private:
 ResourceProviderManagerProcess::ResourceProviderManagerProcess(
     Owned<Registrar> _registrar)
   : ProcessBase(process::ID::generate("resource-provider-manager")),
+    registrar(std::move(_registrar)),
     metrics(*this)
 {
-  CHECK_NOTNULL(_registrar.get());
+  CHECK_NOTNULL(registrar.get());
 }
 
 
-Future<http::Response> ResourceProviderManagerProcess::api(
-    const http::Request& request,
-    const Option<Principal>& principal)
+void ResourceProviderManagerProcess::initialize()
 {
-  if (request.method != "POST") {
-    return MethodNotAllowed({"POST"}, request.method);
-  }
-
-  v1::resource_provider::Call v1Call;
-
-  // TODO(anand): Content type values are case-insensitive.
-  Option<string> contentType = request.headers.get("Content-Type");
-
-  if (contentType.isNone()) {
-    return BadRequest("Expecting 'Content-Type' to be present");
-  }
-
-  if (contentType.get() == APPLICATION_PROTOBUF) {
-    if (!v1Call.ParseFromString(request.body)) {
-      return BadRequest("Failed to parse body into Call protobuf");
-    }
-  } else if (contentType.get() == APPLICATION_JSON) {
-    Try<JSON::Value> value = JSON::parse(request.body);
-    if (value.isError()) {
-      return BadRequest("Failed to parse body into JSON: " + value.error());
-    }
-
-    Try<v1::resource_provider::Call> parse =
-      ::protobuf::parse<v1::resource_provider::Call>(value.get());
-
-    if (parse.isError()) {
-      return BadRequest("Failed to convert JSON into Call protobuf: " +
-                        parse.error());
-    }
-
-    v1Call = parse.get();
-  } else {
-    return UnsupportedMediaType(
-        string("Expecting 'Content-Type' of ") +
-        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
-  }
-
-  Call call = devolve(v1Call);
-
-  Option<Error> error = validate(call);
-  if (error.isSome()) {
-    return BadRequest(
-        "Failed to validate resource_provider::Call: " + error->message);
-  }
-
-  if (call.type() == Call::SUBSCRIBE) {
-    // We default to JSON 'Content-Type' in the response since an empty
-    // 'Accept' header results in all media types considered acceptable.
-    ContentType acceptType = ContentType::JSON;
-
-    if (request.acceptsMediaType(APPLICATION_JSON)) {
-      acceptType = ContentType::JSON;
-    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
-      acceptType = ContentType::PROTOBUF;
-    } else {
-      return NotAcceptable(
-          string("Expecting 'Accept' to allow ") +
-          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
-    }
-
-    if (request.headers.contains("Mesos-Stream-Id")) {
-      return BadRequest(
-          "Subscribe calls should not include the 'Mesos-Stream-Id' header");
-    }
-
-    Pipe pipe;
-    OK ok;
-
-    ok.headers["Content-Type"] = stringify(acceptType);
-    ok.type = http::Response::PIPE;
-    ok.reader = pipe.reader();
-
-    // Generate a stream ID and return it in the response.
-    id::UUID streamId = id::UUID::random();
-    ok.headers["Mesos-Stream-Id"] = streamId.toString();
-
-    HttpConnection http(pipe.writer(), acceptType, streamId);
-    subscribe(http, call.subscribe());
-
-    return ok;
-  }
-
-  if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
-    return BadRequest("Resource provider is not subscribed");
-  }
-
-  ResourceProvider* resourceProvider =
-    resourceProviders.subscribed.at(call.resource_provider_id()).get();
-
-  // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
-  if (!request.headers.contains("Mesos-Stream-Id")) {
-    return BadRequest(
-        "All non-subscribe calls should include to 'Mesos-Stream-Id' header");
-  }
-
-  const string& streamId = request.headers.at("Mesos-Stream-Id");
-  if (streamId != resourceProvider->http.streamId.toString()) {
-    return BadRequest(
-        "The stream ID '" + streamId + "' included in this request "
-        "didn't match the stream ID currently associated with "
-        " resource provider ID " + resourceProvider->info.id().value());
-  }
-
-  switch(call.type()) {
-    case Call::UNKNOWN: {
-      return NotImplemented();
-    }
-
-    case Call::SUBSCRIBE: {
-      // `SUBSCRIBE` call should have been handled above.
-      LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
-    }
+  // Recover the registrar.
+  registrar->recover()
+    .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1))
+    .onAny([](const Future<Nothing>& recovered) {
+      if (!recovered.isReady()) {
+        LOG(FATAL)
+        << "Failed to recover resource provider manager registry: "
+        << recovered;
+      }
+    });
+}
 
-    case Call::UPDATE_OPERATION_STATUS: {
-      updateOperationStatus(
-          resourceProvider,
-          call.update_operation_status());
 
-      return Accepted();
-    }
+Future<Nothing> ResourceProviderManagerProcess::recover(
+    const mesos::resource_provider::registry::Registry& registry)
+{
+  recovered.set(Nothing());
 
-    case Call::UPDATE_STATE: {
-      updateState(resourceProvider, call.update_state());
-      return Accepted();
-    }
+  return Nothing();
+}
 
-    case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
-      updatePublishResourcesStatus(
-          resourceProvider,
-          call.update_publish_resources_status());
-      return Accepted();
-    }
-  }
 
-  UNREACHABLE();
+Future<http::Response> ResourceProviderManagerProcess::api(
+    const http::Request& request,
+    const Option<Principal>& principal)
+{
+  // TODO(bbannier): This implementation does not limit the number of messages
+  // in the actor's inbox which could become large should a big number of
+  // resource providers attempt to subscribe before recovery completed. Consider
+  // rejecting requests until the resource provider manager has recovered. This
+  // would likely require implementing retry logic in resource providers.
+  return recovered.future().then(defer(
+      self(), [this, request, principal](const Nothing&) -> http::Response {
+        if (request.method != "POST") {
+          return MethodNotAllowed({"POST"}, request.method);
+        }
+
+        v1::resource_provider::Call v1Call;
+
+        // TODO(anand): Content type values are case-insensitive.
+        Option<string> contentType = request.headers.get("Content-Type");
+
+        if (contentType.isNone()) {
+          return BadRequest("Expecting 'Content-Type' to be present");
+        }
+
+        if (contentType.get() == APPLICATION_PROTOBUF) {
+          if (!v1Call.ParseFromString(request.body)) {
+            return BadRequest("Failed to parse body into Call protobuf");
+          }
+        } else if (contentType.get() == APPLICATION_JSON) {
+          Try<JSON::Value> value = JSON::parse(request.body);
+          if (value.isError()) {
+            return BadRequest(
+                "Failed to parse body into JSON: " + value.error());
+          }
+
+          Try<v1::resource_provider::Call> parse =
+            ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+          if (parse.isError()) {
+            return BadRequest(
+                "Failed to convert JSON into Call protobuf: " + parse.error());
+          }
+
+          v1Call = parse.get();
+        } else {
+          return UnsupportedMediaType(
+              string("Expecting 'Content-Type' of ") + APPLICATION_JSON +
+              " or " + APPLICATION_PROTOBUF);
+        }
+
+        Call call = devolve(v1Call);
+
+        Option<Error> error = validate(call);
+        if (error.isSome()) {
+          return BadRequest(
+              "Failed to validate resource_provider::Call: " + error->message);
+        }
+
+        if (call.type() == Call::SUBSCRIBE) {
+          // We default to JSON 'Content-Type' in the response since an empty
+          // 'Accept' header results in all media types considered acceptable.
+          ContentType acceptType = ContentType::JSON;
+
+          if (request.acceptsMediaType(APPLICATION_JSON)) {
+            acceptType = ContentType::JSON;
+          } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+            acceptType = ContentType::PROTOBUF;
+          } else {
+            return NotAcceptable(
+                string("Expecting 'Accept' to allow ") + "'" +
+                APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+          }
+
+          if (request.headers.contains("Mesos-Stream-Id")) {
+            return BadRequest(
+                "Subscribe calls should not include the 'Mesos-Stream-Id' "
+                "header");
+          }
+
+          Pipe pipe;
+          OK ok;
+
+          ok.headers["Content-Type"] = stringify(acceptType);
+          ok.type = http::Response::PIPE;
+          ok.reader = pipe.reader();
+
+          // Generate a stream ID and return it in the response.
+          id::UUID streamId = id::UUID::random();
+          ok.headers["Mesos-Stream-Id"] = streamId.toString();
+
+          HttpConnection http(pipe.writer(), acceptType, streamId);
+          this->subscribe(http, call.subscribe());
+
+          return std::move(ok);
+        }
+
+        if (!this->resourceProviders.subscribed.contains(
+                call.resource_provider_id())) {
+          return BadRequest("Resource provider is not subscribed");
+        }
+
+        ResourceProvider* resourceProvider =
+          this->resourceProviders.subscribed.at(call.resource_provider_id())
+            .get();
+
+        // This isn't a `SUBSCRIBE` call, so the request should include a stream
+        // ID.
+        if (!request.headers.contains("Mesos-Stream-Id")) {
+          return BadRequest(
+              "All non-subscribe calls should include to 'Mesos-Stream-Id' "
+              "header");
+        }
+
+        const string& streamId = request.headers.at("Mesos-Stream-Id");
+        if (streamId != resourceProvider->http.streamId.toString()) {
+          return BadRequest(
+              "The stream ID '" + streamId +
+              "' included in this request "
+              "didn't match the stream ID currently associated with "
+              " resource provider ID " +
+              resourceProvider->info.id().value());
+        }
+
+        switch (call.type()) {
+          case Call::UNKNOWN: {
+            return NotImplemented();
+          }
+
+          case Call::SUBSCRIBE: {
+            // `SUBSCRIBE` call should have been handled above.
+            LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
+          }
+
+          case Call::UPDATE_OPERATION_STATUS: {
+            this->updateOperationStatus(
+                resourceProvider, call.update_operation_status());
+
+            return Accepted();
+          }
+
+          case Call::UPDATE_STATE: {
+            this->updateState(resourceProvider, call.update_state());
+            return Accepted();
+          }
+
+          case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
+            this->updatePublishResourcesStatus(
+                resourceProvider, call.update_publish_resources_status());
+            return Accepted();
+          }
+        }
+
+        UNREACHABLE();
+      }));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bfcf5571/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index b151e2b..d7ec6a6 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -92,9 +92,11 @@ Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 }
 
 
-Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
+Try<Owned<Registrar>> Registrar::create(
+    master::Registrar* registrar,
+    Registry registry)
 {
-  return new MasterRegistrar(registrar);
+  return new MasterRegistrar(registrar, std::move(registry));
 }
 
 
@@ -150,28 +152,29 @@ class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
 public:
   GenericRegistrarProcess(Owned<Storage> storage);
 
-  Future<Nothing> recover();
+  Future<Registry> recover();
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
-  Future<bool> _apply(Owned<Registrar::Operation> operation);
-
   void update();
 
+  void initialize() override;
+
+private:
+  Future<bool> _apply(Owned<Registrar::Operation> operation);
+
   void _update(
       const Future<Option<Variable<Registry>>>& store,
-      const Registry& updatedRegistry,
       deque<Owned<Registrar::Operation>> applied);
 
-private:
+
   Owned<Storage> storage;
 
   // Use fully qualified type for `State` to disambiguate with `State`
   // enumeration in `ProcessBase`.
   mesos::state::protobuf::State state;
 
-  Option<Future<Nothing>> recovered;
-  Option<Registry> registry;
+  Promise<Nothing> recovered;
   Option<Variable<Registry>> variable;
 
   Option<Error> error;
@@ -191,32 +194,36 @@ GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
 }
 
 
-Future<Nothing> GenericRegistrarProcess::recover()
+void GenericRegistrarProcess::initialize()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
-  if (recovered.isNone()) {
-    recovered = state.fetch<Registry>(NAME).then(
-        defer(self(), [this](const Variable<Registry>& recovery) {
-          registry = recovery.get();
-          variable = recovery;
+  CHECK_NONE(variable);
+
+  recovered.associate(state.fetch<Registry>(NAME).then(
+      defer(self(), [this](const Variable<Registry>& recovery) {
+        variable = recovery;
+        return Nothing();
+      })));
+}
 
-          return Nothing();
-        }));
-  }
 
-  return recovered.get();
+Future<Registry> GenericRegistrarProcess::recover()
+{
+  // Prevent discards on the returned `Future` by marking the result as
+  // `undiscardable` so that we control the lifetime of the recovering registry.
+  return undiscardable(recovered.future()).then([this](const Nothing&) {
+    CHECK_SOME(this->variable);
+    return this->variable->get();
+  });
 }
 
 
 Future<bool> GenericRegistrarProcess::apply(
     Owned<Registrar::Operation> operation)
 {
-  if (recovered.isNone()) {
-    return Failure("Attempted to apply the operation before recovering");
-  }
-
-  return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
+  return undiscardable(recovered.future()).then(
+      defer(self(), &Self::_apply, std::move(operation)));
 }
 
 
@@ -249,8 +256,9 @@ void GenericRegistrarProcess::update()
 
   updating = true;
 
-  CHECK_SOME(registry);
-  Registry updatedRegistry = registry.get();
+  CHECK_SOME(variable);
+
+  Registry updatedRegistry = variable->get();
 
   foreach (Owned<Registrar::Operation>& operation, operations) {
     Try<bool> operationResult = (*operation)(&updatedRegistry);
@@ -272,7 +280,6 @@ void GenericRegistrarProcess::update()
       self(),
       &Self::_update,
       lambda::_1,
-      updatedRegistry,
       std::move(operations)));
 
   operations.clear();
@@ -281,7 +288,6 @@ void GenericRegistrarProcess::update()
 
 void GenericRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
-    const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
 {
   updating = false;
@@ -310,7 +316,6 @@ void GenericRegistrarProcess::_update(
   }
 
   variable = store->get();
-  registry = updatedRegistry;
 
   // Remove the operations.
   while (!applied.empty()) {
@@ -340,7 +345,7 @@ GenericRegistrar::~GenericRegistrar()
 }
 
 
-Future<Nothing> GenericRegistrar::recover()
+Future<Registry> GenericRegistrar::recover()
 {
   return dispatch(process.get(), &GenericRegistrarProcess::recover);
 }
@@ -364,6 +369,8 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
   public:
     AdaptedOperation(Owned<Registrar::Operation> operation);
 
+    Future<registry::Registry> recover();
+
   private:
     Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override;
 
@@ -376,12 +383,17 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
   };
 
 public:
-  explicit MasterRegistrarProcess(master::Registrar* registrar);
+  explicit MasterRegistrarProcess(
+      master::Registrar* registrar,
+      Registry registry);
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
+  Future<registry::Registry> recover() { return registry; }
+
 private:
   master::Registrar* registrar = nullptr;
+  Registry registry;
 };
 
 
@@ -398,9 +410,12 @@ Try<bool> MasterRegistrarProcess::AdaptedOperation::perform(
 }
 
 
-MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
+MasterRegistrarProcess::MasterRegistrarProcess(
+    master::Registrar* _registrar,
+    registry::Registry _registry)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    registrar(_registrar) {}
+    registrar(_registrar),
+    registry(std::move(_registry)) {}
 
 
 Future<bool> MasterRegistrarProcess::apply(
@@ -413,8 +428,10 @@ Future<bool> MasterRegistrarProcess::apply(
 }
 
 
-MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
-  : process(new MasterRegistrarProcess(registrar))
+MasterRegistrar::MasterRegistrar(
+    master::Registrar* registrar,
+    registry::Registry registry)
+  : process(new MasterRegistrarProcess(registrar, std::move(registry)))
 {
   spawn(process.get(), false);
 }
@@ -427,9 +444,9 @@ MasterRegistrar::~MasterRegistrar()
 }
 
 
-Future<Nothing> MasterRegistrar::recover()
+Future<Registry> MasterRegistrar::recover()
 {
-  return Nothing();
+  return dispatch(process.get(), &MasterRegistrarProcess::recover);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bfcf5571/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 3c10785..ded56e1 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -71,12 +71,16 @@ public:
       process::Owned<state::Storage> storage);
 
   // Create a registry on top of a master's persistent state.
+  //
+  // The created registrar does not take ownership of the passed registrar
+  // which needs to be valid as long as the created registrar is alive.
   static Try<process::Owned<Registrar>> create(
-      mesos::internal::master::Registrar* registrar);
+      mesos::internal::master::Registrar* registrar,
+      registry::Registry registry);
 
   virtual ~Registrar() = default;
 
-  virtual process::Future<Nothing> recover() = 0;
+  virtual process::Future<registry::Registry> recover() = 0;
   virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
 };
 
@@ -115,7 +119,7 @@ public:
 
   ~GenericRegistrar() override;
 
-  process::Future<Nothing> recover() override;
+  process::Future<registry::Registry> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
@@ -130,13 +134,17 @@ class MasterRegistrarProcess;
 class MasterRegistrar : public Registrar
 {
 public:
-  explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar);
+  // The created registrar does not take ownership of the passed registrar
+  // which needs to be valid as long as the created registrar is alive.
+  explicit MasterRegistrar(
+      mesos::internal::master::Registrar* registrar,
+      registry::Registry registry);
 
   ~MasterRegistrar() override;
 
   // This registrar performs no recovery; instead to recover
   // the underlying master registrar needs to be recovered.
-  process::Future<Nothing> recover() override;
+  process::Future<registry::Registry> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/bfcf5571/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1664073..eb8e4fc 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -841,10 +841,6 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  // Applying operations on a not yet recovered registrar fails.
-  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
-      new AdmitResourceProvider(resourceProviderId))));
-
   AWAIT_READY(registrar.get()->recover());
 
   Future<bool> admitResourceProvider =
@@ -873,15 +869,16 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 
   const MasterInfo masterInfo = protobuf::createMasterInfo({});
 
-  Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar);
+  Future<Registry> registry = masterRegistrar.recover(masterInfo);
+  AWAIT_READY(registry);
+
+  Try<Owned<Registrar>> registrar = Registrar::create(
+      &masterRegistrar,
+      registry->resource_provider_registry());
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  // Applying operations on a not yet recovered registrar fails.
-  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
-      new AdmitResourceProvider(resourceProviderId))));
-
   AWAIT_READY(masterRegistrar.recover(masterInfo));
 
   Future<bool> admitResourceProvider =


[04/10] mesos git commit: Renamed resource provider `AgentRegistrar` to `GenericRegistrar`.

Posted by bb...@apache.org.
Renamed resource provider `AgentRegistrar` to `GenericRegistrar`.

This registrar and its matching process can work on generic storage
and it is currently used to work on an agent's persist store.

Its name should not be tied to the agent, so we rename the registrar
in this patch.

Review: https://reviews.apache.org/r/66526/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f6becd6d
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f6becd6d
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f6becd6d

Branch: refs/heads/master
Commit: f6becd6d6f3c8cee779903794ebcfb4f68405358
Parents: 6f6413b
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue Apr 24 11:36:15 2018 +0200
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Wed Apr 25 12:37:09 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/registrar.cpp           | 35 +++++++++++-----------
 src/resource_provider/registrar.hpp           | 10 +++----
 src/tests/resource_provider_manager_tests.cpp |  4 +--
 3 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f6becd6d/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index dbb55dd..53b403e 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -88,7 +88,7 @@ bool Registrar::Operation::set()
 
 Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 {
-  return new AgentRegistrar(std::move(storage));
+  return new GenericRegistrar(std::move(storage));
 }
 
 
@@ -145,10 +145,10 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
 }
 
 
-class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
+class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
 {
 public:
-  AgentRegistrarProcess(Owned<Storage> storage);
+  GenericRegistrarProcess(Owned<Storage> storage);
 
   Future<Nothing> recover();
 
@@ -182,13 +182,13 @@ private:
 };
 
 
-AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
-  : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
+GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
+  : ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
     storage(std::move(_storage)),
     state(storage.get()) {}
 
 
-Future<Nothing> AgentRegistrarProcess::recover()
+Future<Nothing> GenericRegistrarProcess::recover()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
@@ -206,7 +206,8 @@ Future<Nothing> AgentRegistrarProcess::recover()
 }
 
 
-Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
+Future<bool> GenericRegistrarProcess::apply(
+    Owned<Registrar::Operation> operation)
 {
   if (recovered.isNone()) {
     return Failure("Attempted to apply the operation before recovering");
@@ -216,7 +217,7 @@ Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
 }
 
 
-Future<bool> AgentRegistrarProcess::_apply(
+Future<bool> GenericRegistrarProcess::_apply(
     Owned<Registrar::Operation> operation)
 {
   if (error.isSome()) {
@@ -234,7 +235,7 @@ Future<bool> AgentRegistrarProcess::_apply(
 }
 
 
-void AgentRegistrarProcess::update()
+void GenericRegistrarProcess::update()
 {
   CHECK(!updating);
   CHECK_NONE(error);
@@ -275,7 +276,7 @@ void AgentRegistrarProcess::update()
 }
 
 
-void AgentRegistrarProcess::_update(
+void GenericRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
     const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
@@ -322,31 +323,31 @@ void AgentRegistrarProcess::_update(
 }
 
 
-AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
-  : process(new AgentRegistrarProcess(std::move(storage)))
+GenericRegistrar::GenericRegistrar(Owned<Storage> storage)
+  : process(new GenericRegistrarProcess(std::move(storage)))
 {
   process::spawn(process.get(), false);
 }
 
 
-AgentRegistrar::~AgentRegistrar()
+GenericRegistrar::~GenericRegistrar()
 {
   process::terminate(*process);
   process::wait(*process);
 }
 
 
-Future<Nothing> AgentRegistrar::recover()
+Future<Nothing> GenericRegistrar::recover()
 {
-  return dispatch(process.get(), &AgentRegistrarProcess::recover);
+  return dispatch(process.get(), &GenericRegistrarProcess::recover);
 }
 
 
-Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
+Future<bool> GenericRegistrar::apply(Owned<Operation> operation)
 {
   return dispatch(
       process.get(),
-      &AgentRegistrarProcess::apply,
+      &GenericRegistrarProcess::apply,
       std::move(operation));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f6becd6d/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 34cb166..3c10785 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -105,22 +105,22 @@ private:
 };
 
 
-class AgentRegistrarProcess;
+class GenericRegistrarProcess;
 
 
-class AgentRegistrar : public Registrar
+class GenericRegistrar : public Registrar
 {
 public:
-  AgentRegistrar(process::Owned<state::Storage> storage);
+  GenericRegistrar(process::Owned<state::Storage> storage);
 
-  ~AgentRegistrar() override;
+  ~GenericRegistrar() override;
 
   process::Future<Nothing> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
 private:
-  std::unique_ptr<AgentRegistrarProcess> process;
+  std::unique_ptr<GenericRegistrarProcess> process;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f6becd6d/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 72e8122..ceb7854 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -819,8 +819,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
 class ResourceProviderRegistrarTest : public tests::MesosTest {};
 
 
-// Test that the agent resource provider registrar works as expected.
-TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
+// Test that the generic resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
 {
   ResourceProviderID resourceProviderId;
   resourceProviderId.set_value("foo");