You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by al...@apache.org on 2018/04/25 15:13:05 UTC

[1/9] mesos git commit: Revert "Removed redundant master flags in resource provider tests."

Repository: mesos
Updated Branches:
  refs/heads/master 1cbda33bb -> a1c6a7a3c


Revert "Removed redundant master flags in resource provider tests."

This reverts commit b1538db6b9d12633195e0d5a0eff48246ea0d4ad.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/0f3ec500
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/0f3ec500
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/0f3ec500

Branch: refs/heads/master
Commit: 0f3ec500d04c860b2334e6bea040f718dfece555
Parents: 1cbda33
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:07:01 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:07:01 2018 +0200

----------------------------------------------------------------------
 src/tests/resource_provider_manager_tests.cpp | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/0f3ec500/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 889cb32..1ee4dc3 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -914,7 +914,8 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
 {
   // Start master and agent.
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1049,7 +1050,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
   Clock::pause();
 
   // Start master and agent.
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1303,7 +1305,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
   Clock::pause();
 
   // Start master and agent.
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1379,7 +1382,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
   Clock::pause();
 
   // Start master and agent.
-  Try<Owned<cluster::Master>> master = StartMaster();
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();


[2/9] mesos git commit: Revert "Prevent resubscription of resource providers with unknown IDs."

Posted by al...@apache.org.
Revert "Prevent resubscription of resource providers with unknown IDs."

This reverts commit 1679d4c85c0bd095e2dafaf7a95adde258db8179.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f13a6e22
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f13a6e22
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f13a6e22

Branch: refs/heads/master
Commit: f13a6e22f7019f9bd5e8941f54fe4616df1972f4
Parents: 0f3ec50
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:07:10 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:07:10 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 13 +----
 src/tests/resource_provider_manager_tests.cpp | 61 ----------------------
 2 files changed, 1 insertion(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f13a6e22/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 05911de..1219bb7 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -678,18 +678,7 @@ void ResourceProviderManagerProcess::subscribe(
     // restarted or an agent failover. The 'ResourceProviderInfo' might
     // have been updated, but its type and name should remain the same.
     // We should checkpoint its 'type', 'name' and ID, then check if the
-    // resubscription is consistent with the checkpointed record.
-
-    const ResourceProviderID& resourceProviderId = resourceProviderInfo.id();
-
-    if (!resourceProviders.known.contains(resourceProviderId)) {
-      LOG(INFO)
-        << "Dropping resubscription attempt of resource provider with ID "
-        << resourceProviderId
-        << " since it is unknown";
-
-      return;
-    }
+    // resubscribption is consistent with the checkpointed record.
 
     // If the resource provider is known we do not need to admit it
     // again, and the registrar operation implicitly succeeded.

http://git-wip-us.apache.org/repos/asf/mesos/blob/f13a6e22/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1ee4dc3..8c364a1 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -1150,67 +1150,6 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
 }
 
 
-// Test that when a resource provider attempts to resubscribe with an
-// unknown ID it is not admitted but disconnected.
-TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeUnknownID)
-{
-  Clock::pause();
-
-  // Start master and agent.
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  slave::Flags slaveFlags = CreateSlaveFlags();
-
-  // For the agent's resource provider manager to start,
-  // the agent needs to have been assigned an agent ID.
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
-
-  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
-  ASSERT_SOME(agent);
-
-  Clock::advance(slaveFlags.registration_backoff_factor);
-  Clock::settle();
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  mesos::v1::ResourceProviderID resourceProviderId;
-  resourceProviderId.set_value(id::UUID::random().toString());
-
-  mesos::v1::ResourceProviderInfo resourceProviderInfo;
-  resourceProviderInfo.mutable_id()->CopyFrom(resourceProviderId);
-  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
-  resourceProviderInfo.set_name("test");
-
-  Owned<v1::MockResourceProvider> resourceProvider(
-      new v1::MockResourceProvider(resourceProviderInfo));
-
-  // We explicitly reset the resource provider after the expected
-  // disconnect to prevent it from resubscribing indefinitely.
-  Future<Nothing> disconnected;
-  EXPECT_CALL(*resourceProvider, disconnected())
-    .WillOnce(DoAll(
-        Invoke([&resourceProvider]() { resourceProvider.reset(); }),
-        FutureSatisfy(&disconnected)));
-
-  // Start and register a resource provider.
-  Owned<EndpointDetector> endpointDetector(
-      resource_provider::createEndpointDetector(agent.get()->pid));
-
-  const ContentType contentType = GetParam();
-
-  resourceProvider->start(
-      endpointDetector,
-      contentType,
-      v1::DEFAULT_CREDENTIAL);
-
-  AWAIT_READY(disconnected);
-}
-
-
 // This test verifies that a disconnected resource provider will
 // result in an `UpdateSlaveMessage` to be sent to the master and the
 // total resources of the disconnected resource provider will be


[3/9] mesos git commit: Revert "Added admitted resource providers to the manager's registry."

Posted by al...@apache.org.
Revert "Added admitted resource providers to the manager's registry."

This reverts commit 23ff6622fde41f40ee1d4ee914b5302c7b25de01.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2855a6b6
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2855a6b6
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2855a6b6

Branch: refs/heads/master
Commit: 2855a6b607510e4114b22bddf43197c72176995e
Parents: f13a6e2
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:07:20 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:07:20 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 45 ++--------------------
 src/resource_provider/registrar.cpp           | 10 -----
 src/resource_provider/registry.proto          |  1 -
 src/tests/resource_provider_manager_tests.cpp | 28 ++++----------
 4 files changed, 11 insertions(+), 73 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 1219bb7..32f23a7 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,7 +58,6 @@ using std::string;
 
 using mesos::internal::resource_provider::validation::call::validate;
 
-using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
@@ -185,10 +184,6 @@ private:
       const HttpConnection& http,
       const Call::Subscribe& subscribe);
 
-  void _subscribe(
-      const Future<bool>& admitResourceProvider,
-      Owned<ResourceProvider> resourceProvider);
-
   void updateOperationStatus(
       ResourceProvider* resourceProvider,
       const Call::UpdateOperationStatus& update);
@@ -662,53 +657,17 @@ void ResourceProviderManagerProcess::subscribe(
   Owned<ResourceProvider> resourceProvider(
       new ResourceProvider(resourceProviderInfo, http));
 
-  Future<bool> admitResourceProvider;
-
   if (!resourceProviderInfo.has_id()) {
     // The resource provider is subscribing for the first time.
     resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
-
-    // If we are handing out a new `ResourceProviderID` persist the ID by
-    // triggering a `AdmitResourceProvider` operation on the registrar.
-    admitResourceProvider =
-      registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>(
-          new AdmitResourceProvider(resourceProvider->info.id())));
   } else {
     // TODO(chhsiao): The resource provider is resubscribing after being
     // restarted or an agent failover. The 'ResourceProviderInfo' might
     // have been updated, but its type and name should remain the same.
     // We should checkpoint its 'type', 'name' and ID, then check if the
     // resubscribption is consistent with the checkpointed record.
-
-    // If the resource provider is known we do not need to admit it
-    // again, and the registrar operation implicitly succeeded.
-    admitResourceProvider = true;
   }
 
-  admitResourceProvider.onAny(defer(
-      self(),
-      &ResourceProviderManagerProcess::_subscribe,
-      lambda::_1,
-      std::move(resourceProvider)));
-}
-
-
-void ResourceProviderManagerProcess::_subscribe(
-    const Future<bool>& admitResourceProvider,
-    Owned<ResourceProvider> resourceProvider)
-{
-  if (!admitResourceProvider.isReady()) {
-    LOG(INFO)
-      << "Not subscribing resource provider " << resourceProvider->info.id()
-      << " as registry update did not succeed: " << admitResourceProvider;
-
-    return;
-  }
-
-  CHECK(admitResourceProvider.get())
-    << "Could not admit resource provider " << resourceProvider->info.id()
-    << " as registry update was rejected";
-
   const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
 
   Event event;
@@ -722,7 +681,7 @@ void ResourceProviderManagerProcess::_subscribe(
     return;
   }
 
-  resourceProvider->http.closed()
+  http.closed()
     .onAny(defer(self(), [=](const Future<Nothing>& future) {
       // Iff the remote side closes the HTTP connection, the future will be
       // ready. We will remove the resource provider in that case.
@@ -757,6 +716,8 @@ void ResourceProviderManagerProcess::_subscribe(
     resourceProviders.known.put(
         resourceProviderId,
         std::move(resourceProvider_));
+
+    // TODO(bbannier): Persist this information in the registry.
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index c506ec1..d7ec6a6 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -115,15 +115,6 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry)
     return Error("Resource provider already admitted");
   }
 
-  if (std::find_if(
-          registry->removed_resource_providers().begin(),
-          registry->removed_resource_providers().end(),
-          [this](const ResourceProvider& resourceProvider) {
-            return resourceProvider.id() == this->id;
-          }) != registry->removed_resource_providers().end()) {
-    return Error("Resource provider was removed");
-  }
-
   ResourceProvider resourceProvider;
   resourceProvider.mutable_id()->CopyFrom(id);
 
@@ -150,7 +141,6 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
     return Error("Attempted to remove an unknown resource provider");
   }
 
-  registry->add_removed_resource_providers()->CopyFrom(*pos);
   registry->mutable_resource_providers()->erase(pos);
 
   return true; // Mutation.

http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
index 491263e..14bd433 100644
--- a/src/resource_provider/registry.proto
+++ b/src/resource_provider/registry.proto
@@ -35,5 +35,4 @@ message ResourceProvider {
 // A top level object that is managed by the Registrar and persisted.
 message Registry {
   repeated ResourceProvider resource_providers = 1;
-  repeated ResourceProvider removed_resource_providers = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2855a6b6/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 8c364a1..eb8e4fc 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -843,24 +843,17 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
 
   AWAIT_READY(registrar.get()->recover());
 
-  Future<bool> admitResourceProvider1 =
+  Future<bool> admitResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider1);
-  EXPECT_TRUE(admitResourceProvider1.get());
+  AWAIT_READY(admitResourceProvider);
+  EXPECT_TRUE(admitResourceProvider.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
-
-  // A removed resource provider cannot be admitted again.
-  Future<bool> admitResourceProvider2 =
-    registrar.get()->apply(Owned<Registrar::Operation>(
-        new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider2);
-  EXPECT_FALSE(admitResourceProvider2.get());
 }
 
 
@@ -886,24 +879,19 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  Future<bool> admitResourceProvider1 =
+  AWAIT_READY(masterRegistrar.recover(masterInfo));
+
+  Future<bool> admitResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider1);
-  EXPECT_TRUE(admitResourceProvider1.get());
+  AWAIT_READY(admitResourceProvider);
+  EXPECT_TRUE(admitResourceProvider.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
-
-  // A removed resource provider cannot be admitted again.
-  Future<bool> admitResourceProvider2 =
-    registrar.get()->apply(Owned<Registrar::Operation>(
-        new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider2);
-  EXPECT_FALSE(admitResourceProvider2.get());
 }
 
 


[6/9] mesos git commit: Revert "Passed on registrar when constructing resource provider manager."

Posted by al...@apache.org.
Revert "Passed on registrar when constructing resource provider manager."

This reverts commit e1e9a041be55182fc4d683d973f0a2cd25919cf1.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b1e49463
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b1e49463
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b1e49463

Branch: refs/heads/master
Commit: b1e4946372a06ebffe309056a3a64bbda1bf875b
Parents: f670e2b
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:11 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:11 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 15 ++++--------
 src/resource_provider/manager.hpp             |  5 +---
 src/resource_provider/registrar.cpp           |  5 +---
 src/slave/slave.cpp                           | 27 +--------------------
 src/tests/resource_provider_manager_tests.cpp | 28 +++++++---------------
 5 files changed, 17 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 259a810..68e1866 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,7 +58,6 @@ using mesos::internal::resource_provider::validation::call::validate;
 
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
-using mesos::resource_provider::Registrar;
 
 using process::Failure;
 using process::Future;
@@ -158,7 +157,7 @@ class ResourceProviderManagerProcess
   : public Process<ResourceProviderManagerProcess>
 {
 public:
-  ResourceProviderManagerProcess(Owned<Registrar> _registrar);
+  ResourceProviderManagerProcess();
 
   Future<http::Response> api(
       const http::Request& request,
@@ -213,13 +212,9 @@ private:
 };
 
 
-ResourceProviderManagerProcess::ResourceProviderManagerProcess(
-    Owned<Registrar> _registrar)
+ResourceProviderManagerProcess::ResourceProviderManagerProcess()
   : ProcessBase(process::ID::generate("resource-provider-manager")),
-    metrics(*this)
-{
-  CHECK_NOTNULL(_registrar.get());
-}
+    metrics(*this) {}
 
 
 Future<http::Response> ResourceProviderManagerProcess::api(
@@ -768,8 +763,8 @@ ResourceProviderManagerProcess::Metrics::~Metrics()
 }
 
 
-ResourceProviderManager::ResourceProviderManager(Owned<Registrar> registrar)
-  : process(new ResourceProviderManagerProcess(std::move(registrar)))
+ResourceProviderManager::ResourceProviderManager()
+  : process(new ResourceProviderManagerProcess())
 {
   spawn(CHECK_NOTNULL(process.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index 6c57956..bc017fa 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -26,7 +26,6 @@
 #include "messages/messages.hpp"
 
 #include "resource_provider/message.hpp"
-#include "resource_provider/registrar.hpp"
 
 namespace mesos {
 namespace internal {
@@ -38,9 +37,7 @@ class ResourceProviderManagerProcess;
 class ResourceProviderManager
 {
 public:
-  ResourceProviderManager(
-      process::Owned<resource_provider::Registrar> registrar);
-
+  ResourceProviderManager();
   ~ResourceProviderManager();
 
   ResourceProviderManager(

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index b151e2b..53b403e 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -185,10 +185,7 @@ private:
 GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
   : ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
     storage(std::move(_storage)),
-    state(storage.get())
-{
-  CHECK_NOTNULL(storage.get());
-}
+    state(storage.get()) {}
 
 
 Future<Nothing> GenericRegistrarProcess::recover()

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index 2632772..d313777 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -38,9 +38,6 @@
 
 #include <mesos/module/authenticatee.hpp>
 
-#include <mesos/state/leveldb.hpp>
-#include <mesos/state/in_memory.hpp>
-
 #include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
 
 #include <process/after.hpp>
@@ -8805,29 +8802,7 @@ void Slave::initializeResourceProviderManager(
     return;
   }
 
-  // The registrar uses LevelDB as underlying storage. Since LevelDB
-  // is currently not supported on Windows (see MESOS-5932), we fall
-  // back to in-memory storage there.
-  //
-  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
-#ifndef __WINDOWS__
-  Owned<mesos::state::Storage> storage(new mesos::state::LevelDBStorage(
-      paths::getResourceProviderRegistryPath(flags.work_dir, slaveId)));
-#else
-  LOG(WARNING)
-    << "Persisting resource provider manager state is not supported on Windows";
-  Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
-#endif // __WINDOWS__
-
-  Try<Owned<resource_provider::Registrar>> resourceProviderRegistrar =
-    resource_provider::Registrar::create(std::move(storage));
-
-  CHECK_SOME(resourceProviderRegistrar)
-    << "Could not construct resource provider registrar: "
-    << resourceProviderRegistrar.error();
-
-  resourceProviderManager.reset(
-      new ResourceProviderManager(std::move(resourceProviderRegistrar.get())));
+  resourceProviderManager.reset(new ResourceProviderManager());
 
   if (capabilities.resourceProvider) {
     // Start listening for messages from the resource provider manager.

http://git-wip-us.apache.org/repos/asf/mesos/blob/b1e49463/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1664073..ceb7854 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -67,7 +67,6 @@ using mesos::master::detector::MasterDetector;
 
 using mesos::state::InMemoryStorage;
 using mesos::state::State;
-using mesos::state::Storage;
 
 using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Registrar;
@@ -130,8 +129,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, NoContentType)
   request.method = "POST";
   request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -156,8 +154,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, ValidJsonButInvalidProtobuf)
   request.headers["Content-Type"] = APPLICATION_JSON;
   request.body = stringify(object);
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -180,8 +177,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, MalformedContent)
   request.headers["Content-Type"] = stringify(contentType);
   request.body = "MALFORMED_CONTENT";
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -227,8 +223,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType)
   request.headers["Content-Type"] = unknownMediaType;
   request.body = serialize(contentType, call);
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -240,8 +235,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -348,8 +342,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOperationStatus)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -467,8 +460,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesSuccess)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -575,8 +567,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesFailure)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -683,8 +674,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesDisconnected)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager(
-      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
+  ResourceProviderManager manager;
 
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
   Option<http::Pipe::Reader> reader;


[8/9] mesos git commit: Revert "Externalized creation of resource provider manager backing storage."

Posted by al...@apache.org.
Revert "Externalized creation of resource provider manager backing storage."

This reverts commit 6f6413b618b4d7aec7c8f8e6fa9e3542f1af2b9c.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/ed92ee4e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/ed92ee4e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/ed92ee4e

Branch: refs/heads/master
Commit: ed92ee4e61c44c4fe81da277bb68cee56c818fa7
Parents: 6d56382
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:24 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:24 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/registrar.cpp           | 49 ++++++++++++++++++----
 src/resource_provider/registrar.hpp           | 15 +++----
 src/tests/resource_provider_manager_tests.cpp | 27 +++++++++++-
 3 files changed, 75 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index dbb55dd..92ef9ae 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -27,6 +27,10 @@
 
 #include <mesos/state/in_memory.hpp>
 
+#ifndef __WINDOWS__
+#include <mesos/state/leveldb.hpp>
+#endif // __WINDOWS__
+
 #include <mesos/state/protobuf.hpp>
 
 #include <process/defer.hpp>
@@ -49,6 +53,12 @@ using std::string;
 using mesos::resource_provider::registry::Registry;
 using mesos::resource_provider::registry::ResourceProvider;
 
+using mesos::state::InMemoryStorage;
+
+#ifndef __WINDOWS__
+using mesos::state::LevelDBStorage;
+#endif // __WINDOWS__
+
 using mesos::state::Storage;
 
 using mesos::state::protobuf::Variable;
@@ -86,9 +96,11 @@ bool Registrar::Operation::set()
 }
 
 
-Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
+Try<Owned<Registrar>> Registrar::create(
+    const slave::Flags& slaveFlags,
+    const SlaveID& slaveId)
 {
-  return new AgentRegistrar(std::move(storage));
+  return new AgentRegistrar(slaveFlags, slaveId);
 }
 
 
@@ -148,7 +160,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
 class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
 {
 public:
-  AgentRegistrarProcess(Owned<Storage> storage);
+  AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
 
   Future<Nothing> recover();
 
@@ -179,12 +191,33 @@ private:
   deque<Owned<Registrar::Operation>> operations;
 
   bool updating = false;
+
+  static Owned<Storage> createStorage(const std::string& path);
 };
 
 
-AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
+Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
+{
+  // The registrar uses LevelDB as underlying storage. Since LevelDB
+  // is currently not supported on Windows (see MESOS-5932), we fall
+  // back to in-memory storage there.
+  //
+  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
+#ifndef __WINDOWS__
+  return Owned<Storage>(new LevelDBStorage(path));
+#else
+  LOG(WARNING)
+    << "Persisting resource provider manager state is not supported on Windows";
+  return Owned<Storage>(new InMemoryStorage());
+#endif // __WINDOWS__
+}
+
+
+AgentRegistrarProcess::AgentRegistrarProcess(
+    const slave::Flags& flags, const SlaveID& slaveId)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    storage(std::move(_storage)),
+    storage(createStorage(slave::paths::getResourceProviderRegistryPath(
+        flags.work_dir, slaveId))),
     state(storage.get()) {}
 
 
@@ -322,8 +355,10 @@ void AgentRegistrarProcess::_update(
 }
 
 
-AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
-  : process(new AgentRegistrarProcess(std::move(storage)))
+AgentRegistrar::AgentRegistrar(
+    const slave::Flags& slaveFlags,
+    const SlaveID& slaveId)
+  : process(new AgentRegistrarProcess(slaveFlags, slaveId))
 {
   process::spawn(process.get(), false);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 34cb166..39f45b0 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -19,8 +19,6 @@
 
 #include <memory>
 
-#include <mesos/state/storage.hpp>
-
 #include <process/future.hpp>
 #include <process/owned.hpp>
 
@@ -66,14 +64,15 @@ public:
     bool success = false;
   };
 
-  // Create a registry on top of generic storage.
-  static Try<process::Owned<Registrar>> create(
-      process::Owned<state::Storage> storage);
-
   // Create a registry on top of a master's persistent state.
   static Try<process::Owned<Registrar>> create(
       mesos::internal::master::Registrar* registrar);
 
+  // Create a registry on top of an agent's persistent state.
+  static Try<process::Owned<Registrar>> create(
+      const mesos::internal::slave::Flags& slaveFlags,
+      const SlaveID& slaveId);
+
   virtual ~Registrar() = default;
 
   virtual process::Future<Nothing> recover() = 0;
@@ -111,7 +110,9 @@ class AgentRegistrarProcess;
 class AgentRegistrar : public Registrar
 {
 public:
-  AgentRegistrar(process::Owned<state::Storage> storage);
+  AgentRegistrar(
+      const mesos::internal::slave::Flags& slaveFlags,
+      const SlaveID& slaveId);
 
   ~AgentRegistrar() override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/ed92ee4e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 72e8122..0de4e79 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -825,8 +825,31 @@ TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
   ResourceProviderID resourceProviderId;
   resourceProviderId.set_value("foo");
 
-  Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
-  Try<Owned<Registrar>> registrar = Registrar::create(std::move(storage));
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  const slave::Flags flags = CreateSlaveFlags();
+
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
+  ASSERT_SOME(slave);
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  // The agent will send `UpdateSlaveMessage` after it has created its
+  // meta directories. Await the message to make sure the agent
+  // registrar can create its store in the meta hierarchy.
+  AWAIT_READY(updateSlaveMessage);
+
+  Try<Owned<Registrar>> registrar =
+    Registrar::create(flags, slaveRegisteredMessage->slave_id());
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());


[7/9] mesos git commit: Revert "Renamed resource provider `AgentRegistrar` to `GenericRegistrar`."

Posted by al...@apache.org.
Revert "Renamed resource provider `AgentRegistrar` to `GenericRegistrar`."

This reverts commit f6becd6d6f3c8cee779903794ebcfb4f68405358.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6d563823
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6d563823
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6d563823

Branch: refs/heads/master
Commit: 6d56382316930f5075b9ae2ba8ff0c71d845af47
Parents: b1e4946
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:18 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:18 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/registrar.cpp           | 35 +++++++++++-----------
 src/resource_provider/registrar.hpp           | 10 +++----
 src/tests/resource_provider_manager_tests.cpp |  4 +--
 3 files changed, 24 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6d563823/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 53b403e..dbb55dd 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -88,7 +88,7 @@ bool Registrar::Operation::set()
 
 Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 {
-  return new GenericRegistrar(std::move(storage));
+  return new AgentRegistrar(std::move(storage));
 }
 
 
@@ -145,10 +145,10 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
 }
 
 
-class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
+class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
 {
 public:
-  GenericRegistrarProcess(Owned<Storage> storage);
+  AgentRegistrarProcess(Owned<Storage> storage);
 
   Future<Nothing> recover();
 
@@ -182,13 +182,13 @@ private:
 };
 
 
-GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
-  : ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
+AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
+  : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
     storage(std::move(_storage)),
     state(storage.get()) {}
 
 
-Future<Nothing> GenericRegistrarProcess::recover()
+Future<Nothing> AgentRegistrarProcess::recover()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
@@ -206,8 +206,7 @@ Future<Nothing> GenericRegistrarProcess::recover()
 }
 
 
-Future<bool> GenericRegistrarProcess::apply(
-    Owned<Registrar::Operation> operation)
+Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
 {
   if (recovered.isNone()) {
     return Failure("Attempted to apply the operation before recovering");
@@ -217,7 +216,7 @@ Future<bool> GenericRegistrarProcess::apply(
 }
 
 
-Future<bool> GenericRegistrarProcess::_apply(
+Future<bool> AgentRegistrarProcess::_apply(
     Owned<Registrar::Operation> operation)
 {
   if (error.isSome()) {
@@ -235,7 +234,7 @@ Future<bool> GenericRegistrarProcess::_apply(
 }
 
 
-void GenericRegistrarProcess::update()
+void AgentRegistrarProcess::update()
 {
   CHECK(!updating);
   CHECK_NONE(error);
@@ -276,7 +275,7 @@ void GenericRegistrarProcess::update()
 }
 
 
-void GenericRegistrarProcess::_update(
+void AgentRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
     const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
@@ -323,31 +322,31 @@ void GenericRegistrarProcess::_update(
 }
 
 
-GenericRegistrar::GenericRegistrar(Owned<Storage> storage)
-  : process(new GenericRegistrarProcess(std::move(storage)))
+AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
+  : process(new AgentRegistrarProcess(std::move(storage)))
 {
   process::spawn(process.get(), false);
 }
 
 
-GenericRegistrar::~GenericRegistrar()
+AgentRegistrar::~AgentRegistrar()
 {
   process::terminate(*process);
   process::wait(*process);
 }
 
 
-Future<Nothing> GenericRegistrar::recover()
+Future<Nothing> AgentRegistrar::recover()
 {
-  return dispatch(process.get(), &GenericRegistrarProcess::recover);
+  return dispatch(process.get(), &AgentRegistrarProcess::recover);
 }
 
 
-Future<bool> GenericRegistrar::apply(Owned<Operation> operation)
+Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
 {
   return dispatch(
       process.get(),
-      &GenericRegistrarProcess::apply,
+      &AgentRegistrarProcess::apply,
       std::move(operation));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6d563823/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 3c10785..34cb166 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -105,22 +105,22 @@ private:
 };
 
 
-class GenericRegistrarProcess;
+class AgentRegistrarProcess;
 
 
-class GenericRegistrar : public Registrar
+class AgentRegistrar : public Registrar
 {
 public:
-  GenericRegistrar(process::Owned<state::Storage> storage);
+  AgentRegistrar(process::Owned<state::Storage> storage);
 
-  ~GenericRegistrar() override;
+  ~AgentRegistrar() override;
 
   process::Future<Nothing> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
 private:
-  std::unique_ptr<GenericRegistrarProcess> process;
+  std::unique_ptr<AgentRegistrarProcess> process;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/6d563823/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index ceb7854..72e8122 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -819,8 +819,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
 class ResourceProviderRegistrarTest : public tests::MesosTest {};
 
 
-// Test that the generic resource provider registrar works as expected.
-TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
+// Test that the agent resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
 {
   ResourceProviderID resourceProviderId;
   resourceProviderId.set_value("foo");


[5/9] mesos git commit: Revert "Set up recovery code paths of resource provider manager."

Posted by al...@apache.org.
Revert "Set up recovery code paths of resource provider manager."

This reverts commit bfcf5571869598a2e6d75550013fdaefa57dd6cb.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/f670e2b3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/f670e2b3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/f670e2b3

Branch: refs/heads/master
Commit: f670e2b329514282515a5df2b52d0ddbcbb92a8b
Parents: 26626f4
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:03 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:03 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 323 +++++++++------------
 src/resource_provider/registrar.cpp           |  91 +++---
 src/resource_provider/registrar.hpp           |  18 +-
 src/tests/resource_provider_manager_tests.cpp |  15 +-
 4 files changed, 187 insertions(+), 260 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 6393e7a..259a810 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -36,7 +36,6 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/hashmap.hpp>
-#include <stout/nothing.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/uuid.hpp>
 
@@ -48,7 +47,6 @@
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
-#include "resource_provider/registry.hpp"
 #include "resource_provider/validation.hpp"
 
 namespace http = process::http;
@@ -62,8 +60,6 @@ using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
 
-using mesos::resource_provider::registry::Registry;
-
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -80,10 +76,10 @@ using process::wait;
 
 using process::http::Accepted;
 using process::http::BadRequest;
+using process::http::OK;
 using process::http::MethodNotAllowed;
 using process::http::NotAcceptable;
 using process::http::NotImplemented;
-using process::http::OK;
 using process::http::Pipe;
 using process::http::UnsupportedMediaType;
 
@@ -196,11 +192,6 @@ private:
       ResourceProvider* resourceProvider,
       const Call::UpdatePublishResourcesStatus& update);
 
-  Future<Nothing> recover(
-      const mesos::resource_provider::registry::Registry& registry);
-
-  void initialize() override;
-
   ResourceProviderID newResourceProviderId();
 
   double gaugeSubscribed();
@@ -218,9 +209,6 @@ private:
     Gauge subscribed;
   };
 
-  Owned<Registrar> registrar;
-  Promise<Nothing> recovered;
-
   Metrics metrics;
 };
 
@@ -228,191 +216,152 @@ private:
 ResourceProviderManagerProcess::ResourceProviderManagerProcess(
     Owned<Registrar> _registrar)
   : ProcessBase(process::ID::generate("resource-provider-manager")),
-    registrar(std::move(_registrar)),
     metrics(*this)
 {
-  CHECK_NOTNULL(registrar.get());
+  CHECK_NOTNULL(_registrar.get());
 }
 
 
-void ResourceProviderManagerProcess::initialize()
+Future<http::Response> ResourceProviderManagerProcess::api(
+    const http::Request& request,
+    const Option<Principal>& principal)
 {
-  // Recover the registrar.
-  registrar->recover()
-    .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1))
-    .onAny([](const Future<Nothing>& recovered) {
-      if (!recovered.isReady()) {
-        LOG(FATAL)
-        << "Failed to recover resource provider manager registry: "
-        << recovered;
-      }
-    });
-}
+  if (request.method != "POST") {
+    return MethodNotAllowed({"POST"}, request.method);
+  }
 
+  v1::resource_provider::Call v1Call;
 
-Future<Nothing> ResourceProviderManagerProcess::recover(
-    const mesos::resource_provider::registry::Registry& registry)
-{
-  recovered.set(Nothing());
+  // TODO(anand): Content type values are case-insensitive.
+  Option<string> contentType = request.headers.get("Content-Type");
 
-  return Nothing();
-}
+  if (contentType.isNone()) {
+    return BadRequest("Expecting 'Content-Type' to be present");
+  }
 
+  if (contentType.get() == APPLICATION_PROTOBUF) {
+    if (!v1Call.ParseFromString(request.body)) {
+      return BadRequest("Failed to parse body into Call protobuf");
+    }
+  } else if (contentType.get() == APPLICATION_JSON) {
+    Try<JSON::Value> value = JSON::parse(request.body);
+    if (value.isError()) {
+      return BadRequest("Failed to parse body into JSON: " + value.error());
+    }
 
-Future<http::Response> ResourceProviderManagerProcess::api(
-    const http::Request& request,
-    const Option<Principal>& principal)
-{
-  // TODO(bbannier): This implementation does not limit the number of messages
-  // in the actor's inbox which could become large should a big number of
-  // resource providers attempt to subscribe before recovery completed. Consider
-  // rejecting requests until the resource provider manager has recovered. This
-  // would likely require implementing retry logic in resource providers.
-  return recovered.future().then(defer(
-      self(), [this, request, principal](const Nothing&) -> http::Response {
-        if (request.method != "POST") {
-          return MethodNotAllowed({"POST"}, request.method);
-        }
-
-        v1::resource_provider::Call v1Call;
-
-        // TODO(anand): Content type values are case-insensitive.
-        Option<string> contentType = request.headers.get("Content-Type");
-
-        if (contentType.isNone()) {
-          return BadRequest("Expecting 'Content-Type' to be present");
-        }
-
-        if (contentType.get() == APPLICATION_PROTOBUF) {
-          if (!v1Call.ParseFromString(request.body)) {
-            return BadRequest("Failed to parse body into Call protobuf");
-          }
-        } else if (contentType.get() == APPLICATION_JSON) {
-          Try<JSON::Value> value = JSON::parse(request.body);
-          if (value.isError()) {
-            return BadRequest(
-                "Failed to parse body into JSON: " + value.error());
-          }
-
-          Try<v1::resource_provider::Call> parse =
-            ::protobuf::parse<v1::resource_provider::Call>(value.get());
-
-          if (parse.isError()) {
-            return BadRequest(
-                "Failed to convert JSON into Call protobuf: " + parse.error());
-          }
-
-          v1Call = parse.get();
-        } else {
-          return UnsupportedMediaType(
-              string("Expecting 'Content-Type' of ") + APPLICATION_JSON +
-              " or " + APPLICATION_PROTOBUF);
-        }
-
-        Call call = devolve(v1Call);
-
-        Option<Error> error = validate(call);
-        if (error.isSome()) {
-          return BadRequest(
-              "Failed to validate resource_provider::Call: " + error->message);
-        }
-
-        if (call.type() == Call::SUBSCRIBE) {
-          // We default to JSON 'Content-Type' in the response since an empty
-          // 'Accept' header results in all media types considered acceptable.
-          ContentType acceptType = ContentType::JSON;
-
-          if (request.acceptsMediaType(APPLICATION_JSON)) {
-            acceptType = ContentType::JSON;
-          } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
-            acceptType = ContentType::PROTOBUF;
-          } else {
-            return NotAcceptable(
-                string("Expecting 'Accept' to allow ") + "'" +
-                APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
-          }
-
-          if (request.headers.contains("Mesos-Stream-Id")) {
-            return BadRequest(
-                "Subscribe calls should not include the 'Mesos-Stream-Id' "
-                "header");
-          }
-
-          Pipe pipe;
-          OK ok;
-
-          ok.headers["Content-Type"] = stringify(acceptType);
-          ok.type = http::Response::PIPE;
-          ok.reader = pipe.reader();
-
-          // Generate a stream ID and return it in the response.
-          id::UUID streamId = id::UUID::random();
-          ok.headers["Mesos-Stream-Id"] = streamId.toString();
-
-          HttpConnection http(pipe.writer(), acceptType, streamId);
-          this->subscribe(http, call.subscribe());
-
-          return std::move(ok);
-        }
-
-        if (!this->resourceProviders.subscribed.contains(
-                call.resource_provider_id())) {
-          return BadRequest("Resource provider is not subscribed");
-        }
-
-        ResourceProvider* resourceProvider =
-          this->resourceProviders.subscribed.at(call.resource_provider_id())
-            .get();
-
-        // This isn't a `SUBSCRIBE` call, so the request should include a stream
-        // ID.
-        if (!request.headers.contains("Mesos-Stream-Id")) {
-          return BadRequest(
-              "All non-subscribe calls should include to 'Mesos-Stream-Id' "
-              "header");
-        }
-
-        const string& streamId = request.headers.at("Mesos-Stream-Id");
-        if (streamId != resourceProvider->http.streamId.toString()) {
-          return BadRequest(
-              "The stream ID '" + streamId +
-              "' included in this request "
-              "didn't match the stream ID currently associated with "
-              " resource provider ID " +
-              resourceProvider->info.id().value());
-        }
-
-        switch (call.type()) {
-          case Call::UNKNOWN: {
-            return NotImplemented();
-          }
-
-          case Call::SUBSCRIBE: {
-            // `SUBSCRIBE` call should have been handled above.
-            LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
-          }
-
-          case Call::UPDATE_OPERATION_STATUS: {
-            this->updateOperationStatus(
-                resourceProvider, call.update_operation_status());
-
-            return Accepted();
-          }
-
-          case Call::UPDATE_STATE: {
-            this->updateState(resourceProvider, call.update_state());
-            return Accepted();
-          }
-
-          case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
-            this->updatePublishResourcesStatus(
-                resourceProvider, call.update_publish_resources_status());
-            return Accepted();
-          }
-        }
-
-        UNREACHABLE();
-      }));
+    Try<v1::resource_provider::Call> parse =
+      ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+    if (parse.isError()) {
+      return BadRequest("Failed to convert JSON into Call protobuf: " +
+                        parse.error());
+    }
+
+    v1Call = parse.get();
+  } else {
+    return UnsupportedMediaType(
+        string("Expecting 'Content-Type' of ") +
+        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
+  }
+
+  Call call = devolve(v1Call);
+
+  Option<Error> error = validate(call);
+  if (error.isSome()) {
+    return BadRequest(
+        "Failed to validate resource_provider::Call: " + error->message);
+  }
+
+  if (call.type() == Call::SUBSCRIBE) {
+    // We default to JSON 'Content-Type' in the response since an empty
+    // 'Accept' header results in all media types considered acceptable.
+    ContentType acceptType = ContentType::JSON;
+
+    if (request.acceptsMediaType(APPLICATION_JSON)) {
+      acceptType = ContentType::JSON;
+    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+      acceptType = ContentType::PROTOBUF;
+    } else {
+      return NotAcceptable(
+          string("Expecting 'Accept' to allow ") +
+          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+    }
+
+    if (request.headers.contains("Mesos-Stream-Id")) {
+      return BadRequest(
+          "Subscribe calls should not include the 'Mesos-Stream-Id' header");
+    }
+
+    Pipe pipe;
+    OK ok;
+
+    ok.headers["Content-Type"] = stringify(acceptType);
+    ok.type = http::Response::PIPE;
+    ok.reader = pipe.reader();
+
+    // Generate a stream ID and return it in the response.
+    id::UUID streamId = id::UUID::random();
+    ok.headers["Mesos-Stream-Id"] = streamId.toString();
+
+    HttpConnection http(pipe.writer(), acceptType, streamId);
+    subscribe(http, call.subscribe());
+
+    return ok;
+  }
+
+  if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
+    return BadRequest("Resource provider is not subscribed");
+  }
+
+  ResourceProvider* resourceProvider =
+    resourceProviders.subscribed.at(call.resource_provider_id()).get();
+
+  // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
+  if (!request.headers.contains("Mesos-Stream-Id")) {
+    return BadRequest(
+        "All non-subscribe calls should include to 'Mesos-Stream-Id' header");
+  }
+
+  const string& streamId = request.headers.at("Mesos-Stream-Id");
+  if (streamId != resourceProvider->http.streamId.toString()) {
+    return BadRequest(
+        "The stream ID '" + streamId + "' included in this request "
+        "didn't match the stream ID currently associated with "
+        " resource provider ID " + resourceProvider->info.id().value());
+  }
+
+  switch(call.type()) {
+    case Call::UNKNOWN: {
+      return NotImplemented();
+    }
+
+    case Call::SUBSCRIBE: {
+      // `SUBSCRIBE` call should have been handled above.
+      LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
+    }
+
+    case Call::UPDATE_OPERATION_STATUS: {
+      updateOperationStatus(
+          resourceProvider,
+          call.update_operation_status());
+
+      return Accepted();
+    }
+
+    case Call::UPDATE_STATE: {
+      updateState(resourceProvider, call.update_state());
+      return Accepted();
+    }
+
+    case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
+      updatePublishResourcesStatus(
+          resourceProvider,
+          call.update_publish_resources_status());
+      return Accepted();
+    }
+  }
+
+  UNREACHABLE();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index d7ec6a6..b151e2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -92,11 +92,9 @@ Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 }
 
 
-Try<Owned<Registrar>> Registrar::create(
-    master::Registrar* registrar,
-    Registry registry)
+Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
 {
-  return new MasterRegistrar(registrar, std::move(registry));
+  return new MasterRegistrar(registrar);
 }
 
 
@@ -152,29 +150,28 @@ class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
 public:
   GenericRegistrarProcess(Owned<Storage> storage);
 
-  Future<Registry> recover();
+  Future<Nothing> recover();
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
-  void update();
-
-  void initialize() override;
-
-private:
   Future<bool> _apply(Owned<Registrar::Operation> operation);
 
+  void update();
+
   void _update(
       const Future<Option<Variable<Registry>>>& store,
+      const Registry& updatedRegistry,
       deque<Owned<Registrar::Operation>> applied);
 
-
+private:
   Owned<Storage> storage;
 
   // Use fully qualified type for `State` to disambiguate with `State`
   // enumeration in `ProcessBase`.
   mesos::state::protobuf::State state;
 
-  Promise<Nothing> recovered;
+  Option<Future<Nothing>> recovered;
+  Option<Registry> registry;
   Option<Variable<Registry>> variable;
 
   Option<Error> error;
@@ -194,36 +191,32 @@ GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
 }
 
 
-void GenericRegistrarProcess::initialize()
+Future<Nothing> GenericRegistrarProcess::recover()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
-  CHECK_NONE(variable);
-
-  recovered.associate(state.fetch<Registry>(NAME).then(
-      defer(self(), [this](const Variable<Registry>& recovery) {
-        variable = recovery;
-        return Nothing();
-      })));
-}
+  if (recovered.isNone()) {
+    recovered = state.fetch<Registry>(NAME).then(
+        defer(self(), [this](const Variable<Registry>& recovery) {
+          registry = recovery.get();
+          variable = recovery;
 
+          return Nothing();
+        }));
+  }
 
-Future<Registry> GenericRegistrarProcess::recover()
-{
-  // Prevent discards on the returned `Future` by marking the result as
-  // `undiscardable` so that we control the lifetime of the recovering registry.
-  return undiscardable(recovered.future()).then([this](const Nothing&) {
-    CHECK_SOME(this->variable);
-    return this->variable->get();
-  });
+  return recovered.get();
 }
 
 
 Future<bool> GenericRegistrarProcess::apply(
     Owned<Registrar::Operation> operation)
 {
-  return undiscardable(recovered.future()).then(
-      defer(self(), &Self::_apply, std::move(operation)));
+  if (recovered.isNone()) {
+    return Failure("Attempted to apply the operation before recovering");
+  }
+
+  return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
 }
 
 
@@ -256,9 +249,8 @@ void GenericRegistrarProcess::update()
 
   updating = true;
 
-  CHECK_SOME(variable);
-
-  Registry updatedRegistry = variable->get();
+  CHECK_SOME(registry);
+  Registry updatedRegistry = registry.get();
 
   foreach (Owned<Registrar::Operation>& operation, operations) {
     Try<bool> operationResult = (*operation)(&updatedRegistry);
@@ -280,6 +272,7 @@ void GenericRegistrarProcess::update()
       self(),
       &Self::_update,
       lambda::_1,
+      updatedRegistry,
       std::move(operations)));
 
   operations.clear();
@@ -288,6 +281,7 @@ void GenericRegistrarProcess::update()
 
 void GenericRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
+    const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
 {
   updating = false;
@@ -316,6 +310,7 @@ void GenericRegistrarProcess::_update(
   }
 
   variable = store->get();
+  registry = updatedRegistry;
 
   // Remove the operations.
   while (!applied.empty()) {
@@ -345,7 +340,7 @@ GenericRegistrar::~GenericRegistrar()
 }
 
 
-Future<Registry> GenericRegistrar::recover()
+Future<Nothing> GenericRegistrar::recover()
 {
   return dispatch(process.get(), &GenericRegistrarProcess::recover);
 }
@@ -369,8 +364,6 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
   public:
     AdaptedOperation(Owned<Registrar::Operation> operation);
 
-    Future<registry::Registry> recover();
-
   private:
     Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override;
 
@@ -383,17 +376,12 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
   };
 
 public:
-  explicit MasterRegistrarProcess(
-      master::Registrar* registrar,
-      Registry registry);
+  explicit MasterRegistrarProcess(master::Registrar* registrar);
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
-  Future<registry::Registry> recover() { return registry; }
-
 private:
   master::Registrar* registrar = nullptr;
-  Registry registry;
 };
 
 
@@ -410,12 +398,9 @@ Try<bool> MasterRegistrarProcess::AdaptedOperation::perform(
 }
 
 
-MasterRegistrarProcess::MasterRegistrarProcess(
-    master::Registrar* _registrar,
-    registry::Registry _registry)
+MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    registrar(_registrar),
-    registry(std::move(_registry)) {}
+    registrar(_registrar) {}
 
 
 Future<bool> MasterRegistrarProcess::apply(
@@ -428,10 +413,8 @@ Future<bool> MasterRegistrarProcess::apply(
 }
 
 
-MasterRegistrar::MasterRegistrar(
-    master::Registrar* registrar,
-    registry::Registry registry)
-  : process(new MasterRegistrarProcess(registrar, std::move(registry)))
+MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
+  : process(new MasterRegistrarProcess(registrar))
 {
   spawn(process.get(), false);
 }
@@ -444,9 +427,9 @@ MasterRegistrar::~MasterRegistrar()
 }
 
 
-Future<Registry> MasterRegistrar::recover()
+Future<Nothing> MasterRegistrar::recover()
 {
-  return dispatch(process.get(), &MasterRegistrarProcess::recover);
+  return Nothing();
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index ded56e1..3c10785 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -71,16 +71,12 @@ public:
       process::Owned<state::Storage> storage);
 
   // Create a registry on top of a master's persistent state.
-  //
-  // The created registrar does not take ownership of the passed registrar
-  // which needs to be valid as long as the created registrar is alive.
   static Try<process::Owned<Registrar>> create(
-      mesos::internal::master::Registrar* registrar,
-      registry::Registry registry);
+      mesos::internal::master::Registrar* registrar);
 
   virtual ~Registrar() = default;
 
-  virtual process::Future<registry::Registry> recover() = 0;
+  virtual process::Future<Nothing> recover() = 0;
   virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
 };
 
@@ -119,7 +115,7 @@ public:
 
   ~GenericRegistrar() override;
 
-  process::Future<registry::Registry> recover() override;
+  process::Future<Nothing> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
@@ -134,17 +130,13 @@ class MasterRegistrarProcess;
 class MasterRegistrar : public Registrar
 {
 public:
-  // The created registrar does not take ownership of the passed registrar
-  // which needs to be valid as long as the created registrar is alive.
-  explicit MasterRegistrar(
-      mesos::internal::master::Registrar* registrar,
-      registry::Registry registry);
+  explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar);
 
   ~MasterRegistrar() override;
 
   // This registrar performs no recovery; instead to recover
   // the underlying master registrar needs to be recovered.
-  process::Future<registry::Registry> recover() override;
+  process::Future<Nothing> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/f670e2b3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index eb8e4fc..1664073 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -841,6 +841,10 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
+  // Applying operations on a not yet recovered registrar fails.
+  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
   AWAIT_READY(registrar.get()->recover());
 
   Future<bool> admitResourceProvider =
@@ -869,16 +873,15 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 
   const MasterInfo masterInfo = protobuf::createMasterInfo({});
 
-  Future<Registry> registry = masterRegistrar.recover(masterInfo);
-  AWAIT_READY(registry);
-
-  Try<Owned<Registrar>> registrar = Registrar::create(
-      &masterRegistrar,
-      registry->resource_provider_registry());
+  Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar);
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
+  // Applying operations on a not yet recovered registrar fails.
+  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
+      new AdmitResourceProvider(resourceProviderId))));
+
   AWAIT_READY(masterRegistrar.recover(masterInfo));
 
   Future<bool> admitResourceProvider =


[4/9] mesos git commit: Revert "Remembered recovered and subscribed providers in ephemeral state."

Posted by al...@apache.org.
Revert "Remembered recovered and subscribed providers in ephemeral state."

This reverts commit ac870c00ab11ccc14b9c872c78130ef4d568c0ee.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/26626f42
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/26626f42
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/26626f42

Branch: refs/heads/master
Commit: 26626f42e974ec012a276f08006231723f72e51b
Parents: 2855a6b
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:08:57 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:08:57 2018 +0200

----------------------------------------------------------------------
 src/resource_provider/manager.cpp | 22 ----------------------
 1 file changed, 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/26626f42/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 32f23a7..6393e7a 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -208,10 +208,6 @@ private:
   struct ResourceProviders
   {
     hashmap<ResourceProviderID, Owned<ResourceProvider>> subscribed;
-    hashmap<
-        ResourceProviderID,
-        mesos::resource_provider::registry::ResourceProvider>
-      known;
   } resourceProviders;
 
   struct Metrics
@@ -257,13 +253,6 @@ void ResourceProviderManagerProcess::initialize()
 Future<Nothing> ResourceProviderManagerProcess::recover(
     const mesos::resource_provider::registry::Registry& registry)
 {
-  foreach (
-      const mesos::resource_provider::registry::ResourceProvider&
-        resourceProvider,
-      registry.resource_providers()) {
-    resourceProviders.known.put(resourceProvider.id(), resourceProvider);
-  }
-
   recovered.set(Nothing());
 
   return Nothing();
@@ -708,17 +697,6 @@ void ResourceProviderManagerProcess::subscribe(
   resourceProviders.subscribed.put(
       resourceProviderId,
       std::move(resourceProvider));
-
-  if (!resourceProviders.known.contains(resourceProviderId)) {
-    mesos::resource_provider::registry::ResourceProvider resourceProvider_;
-    resourceProvider_.mutable_id()->CopyFrom(resourceProviderId);
-
-    resourceProviders.known.put(
-        resourceProviderId,
-        std::move(resourceProvider_));
-
-    // TODO(bbannier): Persist this information in the registry.
-  }
 }
 
 


[9/9] mesos git commit: Revert "Delayed construction of the agent's resource provider manager."

Posted by al...@apache.org.
Revert "Delayed construction of the agent's resource provider manager."

This reverts commit 0424a6623d08440d8dbe5aff5ec2f18df7b93e24.


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a1c6a7a3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a1c6a7a3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a1c6a7a3

Branch: refs/heads/master
Commit: a1c6a7a3c54518f97a34f28b1792885b928b948c
Parents: ed92ee4
Author: Alexander Rukletsov <al...@apache.org>
Authored: Wed Apr 25 17:09:33 2018 +0200
Committer: Alexander Rukletsov <al...@apache.org>
Committed: Wed Apr 25 17:09:33 2018 +0200

----------------------------------------------------------------------
 src/slave/slave.cpp                           | 94 +++++-----------------
 src/slave/slave.hpp                           |  6 +-
 src/tests/resource_provider_manager_tests.cpp |  9 +--
 3 files changed, 22 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d313777..d0ff5f8 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,8 +30,6 @@
 #include <utility>
 #include <vector>
 
-#include <glog/logging.h>
-
 #include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/secret_generator.hpp>
@@ -774,20 +772,15 @@ void Slave::initialize()
           logRequest(request);
           return http.executor(request, principal);
         });
-  route(
-      "/api/v1/resource_provider",
-      READWRITE_HTTP_AUTHENTICATION_REALM,
-      Http::RESOURCE_PROVIDER_HELP(),
-      [this](const http::Request& request, const Option<Principal>& principal)
-        -> Future<http::Response> {
-        logRequest(request);
-
-        if (resourceProviderManager.get() == nullptr) {
-          return http::ServiceUnavailable();
-        }
 
-        return resourceProviderManager->api(request, principal);
-      });
+  route("/api/v1/resource_provider",
+        READWRITE_HTTP_AUTHENTICATION_REALM,
+        Http::RESOURCE_PROVIDER_HELP(),
+        [this](const http::Request& request,
+               const Option<Principal>& principal) {
+          logRequest(request);
+          return resourceProviderManager.api(request, principal);
+        });
 
   // TODO(ijimenez): Remove this endpoint at the end of the
   // deprecation cycle on 0.26.
@@ -1509,8 +1502,6 @@ void Slave::registered(
 
       CHECK_SOME(state::checkpoint(path, info));
 
-      initializeResourceProviderManager(flags, info.id());
-
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
@@ -4353,7 +4344,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
   }
 
   if (resourceProviderId.isSome()) {
-    CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
+    resourceProviderManager.applyOperation(message);
     return;
   }
 
@@ -4426,7 +4417,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
   }
 
   if (containsResourceProviderOperations) {
-    CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
+    resourceProviderManager.reconcileOperations(message);
   }
 }
 
@@ -4558,19 +4549,7 @@ void Slave::operationStatusAcknowledgement(
 {
   Operation* operation = getOperation(acknowledgement.operation_uuid());
   if (operation != nullptr) {
-    // If the operation was on resource provider resources forward the
-    // acknowledgement to the resource provider manager as well.
-    Result<ResourceProviderID> resourceProviderId =
-      getResourceProviderId(operation->info());
-
-    CHECK(!resourceProviderId.isError())
-      << "Could not determine resource provider of operation " << operation
-      << ": " << resourceProviderId.error();
-
-    if (resourceProviderId.isSome()) {
-      CHECK_NOTNULL(resourceProviderManager.get())
-        ->acknowledgeOperationStatus(acknowledgement);
-    }
+    resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
 
     CHECK(operation->statuses_size() > 0);
     if (protobuf::isTerminalState(
@@ -7340,8 +7319,10 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
-    if (info.has_id()) {
-      initializeResourceProviderManager(flags, info.id());
+    if (capabilities.resourceProvider) {
+      // Start listening for messages from the resource provider manager.
+      resourceProviderManager.messages().get().onAny(
+          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
     }
 
     // Forward oversubscribed resources.
@@ -7619,7 +7600,7 @@ void Slave::handleResourceProviderMessage(
                << (message.isFailed() ? message.failure() : "future discarded");
 
     // Wait for the next message.
-    CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+    resourceProviderManager.messages().get()
       .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 
     return;
@@ -7878,7 +7859,7 @@ void Slave::handleResourceProviderMessage(
   }
 
   // Wait for the next message.
-  CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
+  resourceProviderManager.messages().get()
     .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 }
 
@@ -8133,24 +8114,6 @@ void Slave::apply(Operation* operation)
 Future<Nothing> Slave::publishResources(
     const Option<Resources>& additionalResources)
 {
-  // If the resource provider manager has not been created yet no resource
-  // providers have been added and we do not need to publish anything.
-  if (resourceProviderManager == nullptr) {
-    // We check whether the passed additional resources are compatible
-    // with the expectation that no resource provider resources are in
-    // use, yet. This is not an exhaustive consistency check.
-    if (additionalResources.isSome()) {
-      foreach (const Resource& resource, additionalResources.get()) {
-        CHECK(!resource.has_provider_id())
-          << "Cannot publish resource provider resources "
-          << additionalResources.get()
-          << " until resource providers have subscribed";
-      }
-    }
-
-    return Nothing();
-  }
-
   Resources resources;
 
   // NOTE: For resources providers that serve quantity-based resources
@@ -8171,8 +8134,7 @@ Future<Nothing> Slave::publishResources(
     resources += additionalResources.get();
   }
 
-  return CHECK_NOTNULL(resourceProviderManager.get())
-    ->publishResources(resources);
+  return resourceProviderManager.publishResources(resources);
 }
 
 
@@ -8792,26 +8754,6 @@ double Slave::_resources_revocable_percent(const string& name)
 }
 
 
-void Slave::initializeResourceProviderManager(
-    const Flags& flags,
-    const SlaveID& slaveId)
-{
-  // To simplify reasoning about lifetimes we do not allow
-  // reinitialization of the resource provider manager.
-  if (resourceProviderManager.get() != nullptr) {
-    return;
-  }
-
-  resourceProviderManager.reset(new ResourceProviderManager());
-
-  if (capabilities.resourceProvider) {
-    // Start listening for messages from the resource provider manager.
-    resourceProviderManager->messages().get().onAny(
-        defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
-  }
-}
-
-
 Framework::Framework(
     Slave* _slave,
     const Flags& slaveFlags,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index c3866c6..c35996b 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,10 +711,6 @@ private:
       const SlaveInfo& previous,
       const SlaveInfo& current) const;
 
-  void initializeResourceProviderManager(
-      const Flags& flags,
-      const SlaveID& slaveId);
-
   protobuf::master::Capabilities requiredMasterCapabilities;
 
   const Flags flags;
@@ -816,7 +812,7 @@ private:
   // (allocated and oversubscribable) resources.
   Option<Resources> oversubscribedResources;
 
-  process::Owned<ResourceProviderManager> resourceProviderManager;
+  ResourceProviderManager resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
 
   // Local resource providers known by the agent.

http://git-wip-us.apache.org/repos/asf/mesos/blob/a1c6a7a3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0de4e79..c52541b 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,17 +755,14 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Owned<MasterDetector> detector = master.get()->createDetector();
+  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
 
-  // For the agent's resource provider manager to start,
-  // the agent needs to have been assigned an agent ID.
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+  Owned<MasterDetector> detector = master.get()->createDetector();
 
   Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
   ASSERT_SOME(agent);
 
-  AWAIT_READY(slaveRegisteredMessage);
+  AWAIT_READY(__recover);
 
   // Wait for recovery to be complete.
   Clock::pause();