You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by ch...@apache.org on 2018/05/02 01:35:49 UTC

[1/9] mesos git commit: Delayed construction of the agent's resource provider manager.

Repository: mesos
Updated Branches:
  refs/heads/master 1c6d9e5e6 -> d4a903a4a


Delayed construction of the agent's resource provider manager.

By delaying the construction of the agent's resource provider manager
we prepare for a following patch introducing a dependency of the
resource provider manager on the agent's ID.

Depending on whether the agent was able to recover an agent ID from
its log or still needs to obtain one in a first registration with the
master, we can only construct the resource provider manager after
different points in the initialization of the agent. To capture the
common code we introduce a helper function performing the necessary
steps.

Review: https://reviews.apache.org/r/66308/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/6850353e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/6850353e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/6850353e

Branch: refs/heads/master
Commit: 6850353e4146ae82de2a2b98d3ce77c5328f3b26
Parents: 1c6d9e5
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:08:48 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:08:48 2018 -0700

----------------------------------------------------------------------
 src/slave/slave.cpp                           | 94 +++++++++++++++++-----
 src/slave/slave.hpp                           |  6 +-
 src/tests/resource_provider_manager_tests.cpp |  9 ++-
 3 files changed, 87 insertions(+), 22 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d0ff5f8..d313777 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -30,6 +30,8 @@
 #include <utility>
 #include <vector>
 
+#include <glog/logging.h>
+
 #include <mesos/type_utils.hpp>
 
 #include <mesos/authentication/secret_generator.hpp>
@@ -772,15 +774,20 @@ void Slave::initialize()
           logRequest(request);
           return http.executor(request, principal);
         });
+  route(
+      "/api/v1/resource_provider",
+      READWRITE_HTTP_AUTHENTICATION_REALM,
+      Http::RESOURCE_PROVIDER_HELP(),
+      [this](const http::Request& request, const Option<Principal>& principal)
+        -> Future<http::Response> {
+        logRequest(request);
+
+        if (resourceProviderManager.get() == nullptr) {
+          return http::ServiceUnavailable();
+        }
 
-  route("/api/v1/resource_provider",
-        READWRITE_HTTP_AUTHENTICATION_REALM,
-        Http::RESOURCE_PROVIDER_HELP(),
-        [this](const http::Request& request,
-               const Option<Principal>& principal) {
-          logRequest(request);
-          return resourceProviderManager.api(request, principal);
-        });
+        return resourceProviderManager->api(request, principal);
+      });
 
   // TODO(ijimenez): Remove this endpoint at the end of the
   // deprecation cycle on 0.26.
@@ -1502,6 +1509,8 @@ void Slave::registered(
 
       CHECK_SOME(state::checkpoint(path, info));
 
+      initializeResourceProviderManager(flags, info.id());
+
       // We start the local resource providers daemon once the agent is
       // running, so the resource providers can use the agent API.
       localResourceProviderDaemon->start(info.id());
@@ -4344,7 +4353,7 @@ void Slave::applyOperation(const ApplyOperationMessage& message)
   }
 
   if (resourceProviderId.isSome()) {
-    resourceProviderManager.applyOperation(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->applyOperation(message);
     return;
   }
 
@@ -4417,7 +4426,7 @@ void Slave::reconcileOperations(const ReconcileOperationsMessage& message)
   }
 
   if (containsResourceProviderOperations) {
-    resourceProviderManager.reconcileOperations(message);
+    CHECK_NOTNULL(resourceProviderManager.get())->reconcileOperations(message);
   }
 }
 
@@ -4549,7 +4558,19 @@ void Slave::operationStatusAcknowledgement(
 {
   Operation* operation = getOperation(acknowledgement.operation_uuid());
   if (operation != nullptr) {
-    resourceProviderManager.acknowledgeOperationStatus(acknowledgement);
+    // If the operation was on resource provider resources forward the
+    // acknowledgement to the resource provider manager as well.
+    Result<ResourceProviderID> resourceProviderId =
+      getResourceProviderId(operation->info());
+
+    CHECK(!resourceProviderId.isError())
+      << "Could not determine resource provider of operation " << operation
+      << ": " << resourceProviderId.error();
+
+    if (resourceProviderId.isSome()) {
+      CHECK_NOTNULL(resourceProviderManager.get())
+        ->acknowledgeOperationStatus(acknowledgement);
+    }
 
     CHECK(operation->statuses_size() > 0);
     if (protobuf::isTerminalState(
@@ -7319,10 +7340,8 @@ void Slave::__recover(const Future<Nothing>& future)
     detection = detector->detect()
       .onAny(defer(self(), &Slave::detected, lambda::_1));
 
-    if (capabilities.resourceProvider) {
-      // Start listening for messages from the resource provider manager.
-      resourceProviderManager.messages().get().onAny(
-          defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+    if (info.has_id()) {
+      initializeResourceProviderManager(flags, info.id());
     }
 
     // Forward oversubscribed resources.
@@ -7600,7 +7619,7 @@ void Slave::handleResourceProviderMessage(
                << (message.isFailed() ? message.failure() : "future discarded");
 
     // Wait for the next message.
-    resourceProviderManager.messages().get()
+    CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
       .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 
     return;
@@ -7859,7 +7878,7 @@ void Slave::handleResourceProviderMessage(
   }
 
   // Wait for the next message.
-  resourceProviderManager.messages().get()
+  CHECK_NOTNULL(resourceProviderManager.get())->messages().get()
     .onAny(defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
 }
 
@@ -8114,6 +8133,24 @@ void Slave::apply(Operation* operation)
 Future<Nothing> Slave::publishResources(
     const Option<Resources>& additionalResources)
 {
+  // If the resource provider manager has not been created yet no resource
+  // providers have been added and we do not need to publish anything.
+  if (resourceProviderManager == nullptr) {
+    // We check whether the passed additional resources are compatible
+    // with the expectation that no resource provider resources are in
+    // use, yet. This is not an exhaustive consistency check.
+    if (additionalResources.isSome()) {
+      foreach (const Resource& resource, additionalResources.get()) {
+        CHECK(!resource.has_provider_id())
+          << "Cannot publish resource provider resources "
+          << additionalResources.get()
+          << " until resource providers have subscribed";
+      }
+    }
+
+    return Nothing();
+  }
+
   Resources resources;
 
   // NOTE: For resources providers that serve quantity-based resources
@@ -8134,7 +8171,8 @@ Future<Nothing> Slave::publishResources(
     resources += additionalResources.get();
   }
 
-  return resourceProviderManager.publishResources(resources);
+  return CHECK_NOTNULL(resourceProviderManager.get())
+    ->publishResources(resources);
 }
 
 
@@ -8754,6 +8792,26 @@ double Slave::_resources_revocable_percent(const string& name)
 }
 
 
+void Slave::initializeResourceProviderManager(
+    const Flags& flags,
+    const SlaveID& slaveId)
+{
+  // To simplify reasoning about lifetimes we do not allow
+  // reinitialization of the resource provider manager.
+  if (resourceProviderManager.get() != nullptr) {
+    return;
+  }
+
+  resourceProviderManager.reset(new ResourceProviderManager());
+
+  if (capabilities.resourceProvider) {
+    // Start listening for messages from the resource provider manager.
+    resourceProviderManager->messages().get().onAny(
+        defer(self(), &Self::handleResourceProviderMessage, lambda::_1));
+  }
+}
+
+
 Framework::Framework(
     Slave* _slave,
     const Flags& slaveFlags,

http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/slave/slave.hpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.hpp b/src/slave/slave.hpp
index fb911ef..4a3d014 100644
--- a/src/slave/slave.hpp
+++ b/src/slave/slave.hpp
@@ -711,6 +711,10 @@ private:
       const SlaveInfo& previous,
       const SlaveInfo& current) const;
 
+  void initializeResourceProviderManager(
+      const Flags& flags,
+      const SlaveID& slaveId);
+
   protobuf::master::Capabilities requiredMasterCapabilities;
 
   const Flags flags;
@@ -812,7 +816,7 @@ private:
   // (allocated and oversubscribable) resources.
   Option<Resources> oversubscribedResources;
 
-  ResourceProviderManager resourceProviderManager;
+  process::Owned<ResourceProviderManager> resourceProviderManager;
   process::Owned<LocalResourceProviderDaemon> localResourceProviderDaemon;
 
   // Local resource providers known by the agent.

http://git-wip-us.apache.org/repos/asf/mesos/blob/6850353e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index c52541b..0de4e79 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -755,14 +755,17 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
-  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
-
   Owned<MasterDetector> detector = master.get()->createDetector();
 
+  // For the agent's resource provider manager to start,
+  // the agent needs to have been assigned an agent ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
   Try<Owned<cluster::Slave>> agent = StartSlave(detector.get());
   ASSERT_SOME(agent);
 
-  AWAIT_READY(__recover);
+  AWAIT_READY(slaveRegisteredMessage);
 
   // Wait for recovery to be complete.
   Clock::pause();


[9/9] mesos git commit: Removed redundant master flags in resource provider tests.

Posted by ch...@apache.org.
Removed redundant master flags in resource provider tests.

Review: https://reviews.apache.org/r/66780/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d4a903a4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d4a903a4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d4a903a4

Branch: refs/heads/master
Commit: d4a903a4a5a0022c1c5d2d5b6ef967ac9793230c
Parents: d7e2125
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:25 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:25 2018 -0700

----------------------------------------------------------------------
 src/tests/resource_provider_manager_tests.cpp | 12 ++++--------
 1 file changed, 4 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d4a903a4/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1ee4dc3..889cb32 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -914,8 +914,7 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
 {
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1050,8 +1049,7 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
   Clock::pause();
 
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1305,8 +1303,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, ResourceProviderSubscribeDisconnect)
   Clock::pause();
 
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();
@@ -1382,8 +1379,7 @@ TEST_F(ResourceProviderManagerHttpApiTest, Metrics)
   Clock::pause();
 
   // Start master and agent.
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
   Owned<MasterDetector> detector = master.get()->createDetector();


[7/9] mesos git commit: Added admitted resource providers to the manager's registry.

Posted by ch...@apache.org.
Added admitted resource providers to the manager's registry.

Review: https://reviews.apache.org/r/66545/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/65ed45fb
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/65ed45fb
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/65ed45fb

Branch: refs/heads/master
Commit: 65ed45fb039e7da64a58094aff67cf0189aa1f6d
Parents: 9a8cef0
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:19 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:19 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 45 ++++++++++++++++++++--
 src/resource_provider/registrar.cpp           | 10 +++++
 src/resource_provider/registry.proto          |  1 +
 src/tests/resource_provider_manager_tests.cpp | 28 ++++++++++----
 4 files changed, 73 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index bc52741..5979480 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,6 +58,7 @@ using std::string;
 
 using mesos::internal::resource_provider::validation::call::validate;
 
+using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
@@ -184,6 +185,10 @@ private:
       const HttpConnection& http,
       const Call::Subscribe& subscribe);
 
+  void _subscribe(
+      const Future<bool>& admitResourceProvider,
+      Owned<ResourceProvider> resourceProvider);
+
   void updateOperationStatus(
       ResourceProvider* resourceProvider,
       const Call::UpdateOperationStatus& update);
@@ -657,17 +662,53 @@ void ResourceProviderManagerProcess::subscribe(
   Owned<ResourceProvider> resourceProvider(
       new ResourceProvider(resourceProviderInfo, http));
 
+  Future<bool> admitResourceProvider;
+
   if (!resourceProviderInfo.has_id()) {
     // The resource provider is subscribing for the first time.
     resourceProvider->info.mutable_id()->CopyFrom(newResourceProviderId());
+
+    // If we are handing out a new `ResourceProviderID` persist the ID by
+    // triggering a `AdmitResourceProvider` operation on the registrar.
+    admitResourceProvider =
+      registrar->apply(Owned<mesos::resource_provider::Registrar::Operation>(
+          new AdmitResourceProvider(resourceProvider->info.id())));
   } else {
     // TODO(chhsiao): The resource provider is resubscribing after being
     // restarted or an agent failover. The 'ResourceProviderInfo' might
     // have been updated, but its type and name should remain the same.
     // We should checkpoint its 'type', 'name' and ID, then check if the
     // resubscribption is consistent with the checkpointed record.
+
+    // If the resource provider is known we do not need to admit it
+    // again, and the registrar operation implicitly succeeded.
+    admitResourceProvider = true;
   }
 
+  admitResourceProvider.onAny(defer(
+      self(),
+      &ResourceProviderManagerProcess::_subscribe,
+      lambda::_1,
+      std::move(resourceProvider)));
+}
+
+
+void ResourceProviderManagerProcess::_subscribe(
+    const Future<bool>& admitResourceProvider,
+    Owned<ResourceProvider> resourceProvider)
+{
+  if (!admitResourceProvider.isReady()) {
+    LOG(INFO)
+      << "Not subscribing resource provider " << resourceProvider->info.id()
+      << " as registry update did not succeed: " << admitResourceProvider;
+
+    return;
+  }
+
+  CHECK(admitResourceProvider.get())
+    << "Could not admit resource provider " << resourceProvider->info.id()
+    << " as registry update was rejected";
+
   const ResourceProviderID& resourceProviderId = resourceProvider->info.id();
 
   Event event;
@@ -681,7 +722,7 @@ void ResourceProviderManagerProcess::subscribe(
     return;
   }
 
-  http.closed()
+  resourceProvider->http.closed()
     .onAny(defer(self(), [=](const Future<Nothing>& future) {
       // Iff the remote side closes the HTTP connection, the future will be
       // ready. We will remove the resource provider in that case.
@@ -716,8 +757,6 @@ void ResourceProviderManagerProcess::subscribe(
     resourceProviders.known.put(
         resourceProviderId,
         std::move(resourceProvider_));
-
-    // TODO(bbannier): Persist this information in the registry.
   }
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 6cc4625..a855a2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -115,6 +115,15 @@ Try<bool> AdmitResourceProvider::perform(Registry* registry)
     return Error("Resource provider already admitted");
   }
 
+  if (std::find_if(
+          registry->removed_resource_providers().begin(),
+          registry->removed_resource_providers().end(),
+          [this](const ResourceProvider& resourceProvider) {
+            return resourceProvider.id() == this->id;
+          }) != registry->removed_resource_providers().end()) {
+    return Error("Resource provider was removed");
+  }
+
   ResourceProvider resourceProvider;
   resourceProvider.mutable_id()->CopyFrom(id);
 
@@ -141,6 +150,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
     return Error("Attempted to remove an unknown resource provider");
   }
 
+  registry->add_removed_resource_providers()->CopyFrom(*pos);
   registry->mutable_resource_providers()->erase(pos);
 
   return true; // Mutation.

http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/resource_provider/registry.proto
----------------------------------------------------------------------
diff --git a/src/resource_provider/registry.proto b/src/resource_provider/registry.proto
index 14bd433..491263e 100644
--- a/src/resource_provider/registry.proto
+++ b/src/resource_provider/registry.proto
@@ -35,4 +35,5 @@ message ResourceProvider {
 // A top level object that is managed by the Registrar and persisted.
 message Registry {
   repeated ResourceProvider resource_providers = 1;
+  repeated ResourceProvider removed_resource_providers = 2;
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/65ed45fb/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index eb8e4fc..8c364a1 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -843,17 +843,24 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
 
   AWAIT_READY(registrar.get()->recover());
 
-  Future<bool> admitResourceProvider =
+  Future<bool> admitResourceProvider1 =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider);
-  EXPECT_TRUE(admitResourceProvider.get());
+  AWAIT_READY(admitResourceProvider1);
+  EXPECT_TRUE(admitResourceProvider1.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
+
+  // A removed resource provider cannot be admitted again.
+  Future<bool> admitResourceProvider2 =
+    registrar.get()->apply(Owned<Registrar::Operation>(
+        new AdmitResourceProvider(resourceProviderId)));
+  AWAIT_READY(admitResourceProvider2);
+  EXPECT_FALSE(admitResourceProvider2.get());
 }
 
 
@@ -879,19 +886,24 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  AWAIT_READY(masterRegistrar.recover(masterInfo));
-
-  Future<bool> admitResourceProvider =
+  Future<bool> admitResourceProvider1 =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new AdmitResourceProvider(resourceProviderId)));
-  AWAIT_READY(admitResourceProvider);
-  EXPECT_TRUE(admitResourceProvider.get());
+  AWAIT_READY(admitResourceProvider1);
+  EXPECT_TRUE(admitResourceProvider1.get());
 
   Future<bool> removeResourceProvider =
     registrar.get()->apply(Owned<Registrar::Operation>(
         new RemoveResourceProvider(resourceProviderId)));
   AWAIT_READY(removeResourceProvider);
   EXPECT_TRUE(removeResourceProvider.get());
+
+  // A removed resource provider cannot be admitted again.
+  Future<bool> admitResourceProvider2 =
+    registrar.get()->apply(Owned<Registrar::Operation>(
+        new AdmitResourceProvider(resourceProviderId)));
+  AWAIT_READY(admitResourceProvider2);
+  EXPECT_FALSE(admitResourceProvider2.get());
 }
 
 


[5/9] mesos git commit: Set up recovery code paths of resource provider manager.

Posted by ch...@apache.org.
Set up recovery code paths of resource provider manager.

This patch adjusts the control flow of the resource provider manager
so that we can in the future make use of persisted resource provider
information. While this patch sets up the needed flow, it does not
implement recovery logic, yet.

Review: https://reviews.apache.org/r/66311/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/169efe6e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/169efe6e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/169efe6e

Branch: refs/heads/master
Commit: 169efe6e832165d441559a38cd7d31b80d1c84ec
Parents: 2d81d7c
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:14 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:14 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 323 ++++++++++++---------
 src/resource_provider/registrar.cpp           |  96 +++---
 src/resource_provider/registrar.hpp           |  18 +-
 src/tests/resource_provider_manager_tests.cpp |  15 +-
 4 files changed, 262 insertions(+), 190 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 67dbfbe..dfb8e73 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -36,6 +36,7 @@
 #include <process/metrics/metrics.hpp>
 
 #include <stout/hashmap.hpp>
+#include <stout/nothing.hpp>
 #include <stout/protobuf.hpp>
 #include <stout/uuid.hpp>
 
@@ -47,6 +48,7 @@
 #include "internal/devolve.hpp"
 #include "internal/evolve.hpp"
 
+#include "resource_provider/registry.hpp"
 #include "resource_provider/validation.hpp"
 
 namespace http = process::http;
@@ -60,6 +62,8 @@ using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
 using mesos::resource_provider::Registrar;
 
+using mesos::resource_provider::registry::Registry;
+
 using process::Failure;
 using process::Future;
 using process::Owned;
@@ -76,10 +80,10 @@ using process::wait;
 
 using process::http::Accepted;
 using process::http::BadRequest;
-using process::http::OK;
 using process::http::MethodNotAllowed;
 using process::http::NotAcceptable;
 using process::http::NotImplemented;
+using process::http::OK;
 using process::http::Pipe;
 using process::http::UnsupportedMediaType;
 
@@ -192,6 +196,11 @@ private:
       ResourceProvider* resourceProvider,
       const Call::UpdatePublishResourcesStatus& update);
 
+  Future<Nothing> recover(
+      const mesos::resource_provider::registry::Registry& registry);
+
+  void initialize() override;
+
   ResourceProviderID newResourceProviderId();
 
   double gaugeSubscribed();
@@ -209,6 +218,9 @@ private:
     PullGauge subscribed;
   };
 
+  Owned<Registrar> registrar;
+  Promise<Nothing> recovered;
+
   Metrics metrics;
 };
 
@@ -216,152 +228,191 @@ private:
 ResourceProviderManagerProcess::ResourceProviderManagerProcess(
     Owned<Registrar> _registrar)
   : ProcessBase(process::ID::generate("resource-provider-manager")),
+    registrar(std::move(_registrar)),
     metrics(*this)
 {
-  CHECK_NOTNULL(_registrar.get());
+  CHECK_NOTNULL(registrar.get());
 }
 
 
-Future<http::Response> ResourceProviderManagerProcess::api(
-    const http::Request& request,
-    const Option<Principal>& principal)
+void ResourceProviderManagerProcess::initialize()
 {
-  if (request.method != "POST") {
-    return MethodNotAllowed({"POST"}, request.method);
-  }
-
-  v1::resource_provider::Call v1Call;
-
-  // TODO(anand): Content type values are case-insensitive.
-  Option<string> contentType = request.headers.get("Content-Type");
-
-  if (contentType.isNone()) {
-    return BadRequest("Expecting 'Content-Type' to be present");
-  }
-
-  if (contentType.get() == APPLICATION_PROTOBUF) {
-    if (!v1Call.ParseFromString(request.body)) {
-      return BadRequest("Failed to parse body into Call protobuf");
-    }
-  } else if (contentType.get() == APPLICATION_JSON) {
-    Try<JSON::Value> value = JSON::parse(request.body);
-    if (value.isError()) {
-      return BadRequest("Failed to parse body into JSON: " + value.error());
-    }
-
-    Try<v1::resource_provider::Call> parse =
-      ::protobuf::parse<v1::resource_provider::Call>(value.get());
-
-    if (parse.isError()) {
-      return BadRequest("Failed to convert JSON into Call protobuf: " +
-                        parse.error());
-    }
-
-    v1Call = parse.get();
-  } else {
-    return UnsupportedMediaType(
-        string("Expecting 'Content-Type' of ") +
-        APPLICATION_JSON + " or " + APPLICATION_PROTOBUF);
-  }
-
-  Call call = devolve(v1Call);
-
-  Option<Error> error = validate(call);
-  if (error.isSome()) {
-    return BadRequest(
-        "Failed to validate resource_provider::Call: " + error->message);
-  }
-
-  if (call.type() == Call::SUBSCRIBE) {
-    // We default to JSON 'Content-Type' in the response since an empty
-    // 'Accept' header results in all media types considered acceptable.
-    ContentType acceptType = ContentType::JSON;
-
-    if (request.acceptsMediaType(APPLICATION_JSON)) {
-      acceptType = ContentType::JSON;
-    } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
-      acceptType = ContentType::PROTOBUF;
-    } else {
-      return NotAcceptable(
-          string("Expecting 'Accept' to allow ") +
-          "'" + APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
-    }
-
-    if (request.headers.contains("Mesos-Stream-Id")) {
-      return BadRequest(
-          "Subscribe calls should not include the 'Mesos-Stream-Id' header");
-    }
-
-    Pipe pipe;
-    OK ok;
-
-    ok.headers["Content-Type"] = stringify(acceptType);
-    ok.type = http::Response::PIPE;
-    ok.reader = pipe.reader();
-
-    // Generate a stream ID and return it in the response.
-    id::UUID streamId = id::UUID::random();
-    ok.headers["Mesos-Stream-Id"] = streamId.toString();
-
-    HttpConnection http(pipe.writer(), acceptType, streamId);
-    subscribe(http, call.subscribe());
-
-    return ok;
-  }
-
-  if (!resourceProviders.subscribed.contains(call.resource_provider_id())) {
-    return BadRequest("Resource provider is not subscribed");
-  }
-
-  ResourceProvider* resourceProvider =
-    resourceProviders.subscribed.at(call.resource_provider_id()).get();
-
-  // This isn't a `SUBSCRIBE` call, so the request should include a stream ID.
-  if (!request.headers.contains("Mesos-Stream-Id")) {
-    return BadRequest(
-        "All non-subscribe calls should include to 'Mesos-Stream-Id' header");
-  }
-
-  const string& streamId = request.headers.at("Mesos-Stream-Id");
-  if (streamId != resourceProvider->http.streamId.toString()) {
-    return BadRequest(
-        "The stream ID '" + streamId + "' included in this request "
-        "didn't match the stream ID currently associated with "
-        " resource provider ID " + resourceProvider->info.id().value());
-  }
-
-  switch(call.type()) {
-    case Call::UNKNOWN: {
-      return NotImplemented();
-    }
-
-    case Call::SUBSCRIBE: {
-      // `SUBSCRIBE` call should have been handled above.
-      LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
-    }
+  // Recover the registrar.
+  registrar->recover()
+    .then(defer(self(), &ResourceProviderManagerProcess::recover, lambda::_1))
+    .onAny([](const Future<Nothing>& recovered) {
+      if (!recovered.isReady()) {
+        LOG(FATAL)
+        << "Failed to recover resource provider manager registry: "
+        << recovered;
+      }
+    });
+}
 
-    case Call::UPDATE_OPERATION_STATUS: {
-      updateOperationStatus(
-          resourceProvider,
-          call.update_operation_status());
 
-      return Accepted();
-    }
+Future<Nothing> ResourceProviderManagerProcess::recover(
+    const mesos::resource_provider::registry::Registry& registry)
+{
+  recovered.set(Nothing());
 
-    case Call::UPDATE_STATE: {
-      updateState(resourceProvider, call.update_state());
-      return Accepted();
-    }
+  return Nothing();
+}
 
-    case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
-      updatePublishResourcesStatus(
-          resourceProvider,
-          call.update_publish_resources_status());
-      return Accepted();
-    }
-  }
 
-  UNREACHABLE();
+Future<http::Response> ResourceProviderManagerProcess::api(
+    const http::Request& request,
+    const Option<Principal>& principal)
+{
+  // TODO(bbannier): This implementation does not limit the number of messages
+  // in the actor's inbox which could become large should a big number of
+  // resource providers attempt to subscribe before recovery completed. Consider
+  // rejecting requests until the resource provider manager has recovered. This
+  // would likely require implementing retry logic in resource providers.
+  return recovered.future().then(defer(
+      self(), [this, request, principal](const Nothing&) -> http::Response {
+        if (request.method != "POST") {
+          return MethodNotAllowed({"POST"}, request.method);
+        }
+
+        v1::resource_provider::Call v1Call;
+
+        // TODO(anand): Content type values are case-insensitive.
+        Option<string> contentType = request.headers.get("Content-Type");
+
+        if (contentType.isNone()) {
+          return BadRequest("Expecting 'Content-Type' to be present");
+        }
+
+        if (contentType.get() == APPLICATION_PROTOBUF) {
+          if (!v1Call.ParseFromString(request.body)) {
+            return BadRequest("Failed to parse body into Call protobuf");
+          }
+        } else if (contentType.get() == APPLICATION_JSON) {
+          Try<JSON::Value> value = JSON::parse(request.body);
+          if (value.isError()) {
+            return BadRequest(
+                "Failed to parse body into JSON: " + value.error());
+          }
+
+          Try<v1::resource_provider::Call> parse =
+            ::protobuf::parse<v1::resource_provider::Call>(value.get());
+
+          if (parse.isError()) {
+            return BadRequest(
+                "Failed to convert JSON into Call protobuf: " + parse.error());
+          }
+
+          v1Call = parse.get();
+        } else {
+          return UnsupportedMediaType(
+              string("Expecting 'Content-Type' of ") + APPLICATION_JSON +
+              " or " + APPLICATION_PROTOBUF);
+        }
+
+        Call call = devolve(v1Call);
+
+        Option<Error> error = validate(call);
+        if (error.isSome()) {
+          return BadRequest(
+              "Failed to validate resource_provider::Call: " + error->message);
+        }
+
+        if (call.type() == Call::SUBSCRIBE) {
+          // We default to JSON 'Content-Type' in the response since an empty
+          // 'Accept' header results in all media types considered acceptable.
+          ContentType acceptType = ContentType::JSON;
+
+          if (request.acceptsMediaType(APPLICATION_JSON)) {
+            acceptType = ContentType::JSON;
+          } else if (request.acceptsMediaType(APPLICATION_PROTOBUF)) {
+            acceptType = ContentType::PROTOBUF;
+          } else {
+            return NotAcceptable(
+                string("Expecting 'Accept' to allow ") + "'" +
+                APPLICATION_PROTOBUF + "' or '" + APPLICATION_JSON + "'");
+          }
+
+          if (request.headers.contains("Mesos-Stream-Id")) {
+            return BadRequest(
+                "Subscribe calls should not include the 'Mesos-Stream-Id' "
+                "header");
+          }
+
+          Pipe pipe;
+          OK ok;
+
+          ok.headers["Content-Type"] = stringify(acceptType);
+          ok.type = http::Response::PIPE;
+          ok.reader = pipe.reader();
+
+          // Generate a stream ID and return it in the response.
+          id::UUID streamId = id::UUID::random();
+          ok.headers["Mesos-Stream-Id"] = streamId.toString();
+
+          HttpConnection http(pipe.writer(), acceptType, streamId);
+          this->subscribe(http, call.subscribe());
+
+          return std::move(ok);
+        }
+
+        if (!this->resourceProviders.subscribed.contains(
+                call.resource_provider_id())) {
+          return BadRequest("Resource provider is not subscribed");
+        }
+
+        ResourceProvider* resourceProvider =
+          this->resourceProviders.subscribed.at(call.resource_provider_id())
+            .get();
+
+        // This isn't a `SUBSCRIBE` call, so the request should include a stream
+        // ID.
+        if (!request.headers.contains("Mesos-Stream-Id")) {
+          return BadRequest(
+              "All non-subscribe calls should include to 'Mesos-Stream-Id' "
+              "header");
+        }
+
+        const string& streamId = request.headers.at("Mesos-Stream-Id");
+        if (streamId != resourceProvider->http.streamId.toString()) {
+          return BadRequest(
+              "The stream ID '" + streamId +
+              "' included in this request "
+              "didn't match the stream ID currently associated with "
+              " resource provider ID " +
+              resourceProvider->info.id().value());
+        }
+
+        switch (call.type()) {
+          case Call::UNKNOWN: {
+            return NotImplemented();
+          }
+
+          case Call::SUBSCRIBE: {
+            // `SUBSCRIBE` call should have been handled above.
+            LOG(FATAL) << "Unexpected 'SUBSCRIBE' call";
+          }
+
+          case Call::UPDATE_OPERATION_STATUS: {
+            this->updateOperationStatus(
+                resourceProvider, call.update_operation_status());
+
+            return Accepted();
+          }
+
+          case Call::UPDATE_STATE: {
+            this->updateState(resourceProvider, call.update_state());
+            return Accepted();
+          }
+
+          case Call::UPDATE_PUBLISH_RESOURCES_STATUS: {
+            this->updatePublishResourcesStatus(
+                resourceProvider, call.update_publish_resources_status());
+            return Accepted();
+          }
+        }
+
+        UNREACHABLE();
+      }));
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index b151e2b..6cc4625 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -92,9 +92,11 @@ Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 }
 
 
-Try<Owned<Registrar>> Registrar::create(master::Registrar* registrar)
+Try<Owned<Registrar>> Registrar::create(
+    master::Registrar* registrar,
+    Registry registry)
 {
-  return new MasterRegistrar(registrar);
+  return new MasterRegistrar(registrar, std::move(registry));
 }
 
 
@@ -150,34 +152,33 @@ class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
 public:
   GenericRegistrarProcess(Owned<Storage> storage);
 
-  Future<Nothing> recover();
+  Future<Registry> recover();
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
-  Future<bool> _apply(Owned<Registrar::Operation> operation);
-
   void update();
 
+  void initialize() override;
+
+private:
+  Future<bool> _apply(Owned<Registrar::Operation> operation);
+
   void _update(
       const Future<Option<Variable<Registry>>>& store,
-      const Registry& updatedRegistry,
       deque<Owned<Registrar::Operation>> applied);
 
-private:
+  // We explicitly hold the storage to keep it alive over the
+  // registrar's lifetime.
   Owned<Storage> storage;
 
   // Use fully qualified type for `State` to disambiguate with `State`
   // enumeration in `ProcessBase`.
   mesos::state::protobuf::State state;
 
-  Option<Future<Nothing>> recovered;
-  Option<Registry> registry;
+  Promise<Nothing> recovered;
   Option<Variable<Registry>> variable;
-
   Option<Error> error;
-
   deque<Owned<Registrar::Operation>> operations;
-
   bool updating = false;
 };
 
@@ -191,32 +192,37 @@ GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
 }
 
 
-Future<Nothing> GenericRegistrarProcess::recover()
+void GenericRegistrarProcess::initialize()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
-  if (recovered.isNone()) {
-    recovered = state.fetch<Registry>(NAME).then(
-        defer(self(), [this](const Variable<Registry>& recovery) {
-          registry = recovery.get();
-          variable = recovery;
+  CHECK_NONE(variable);
+
+  recovered.associate(state.fetch<Registry>(NAME).then(
+      defer(self(), [this](const Variable<Registry>& recovery) {
+        variable = recovery;
+        return Nothing();
+      })));
+}
 
-          return Nothing();
-        }));
-  }
 
-  return recovered.get();
+Future<Registry> GenericRegistrarProcess::recover()
+{
+  // Prevent discards on the returned `Future` by marking the result as
+  // `undiscardable` so that we control the lifetime of the recovering registry.
+  return undiscardable(recovered.future())
+    .then(defer(self(), [this](const Nothing&) {
+      CHECK_SOME(this->variable);
+      return this->variable->get();
+    }));
 }
 
 
 Future<bool> GenericRegistrarProcess::apply(
     Owned<Registrar::Operation> operation)
 {
-  if (recovered.isNone()) {
-    return Failure("Attempted to apply the operation before recovering");
-  }
-
-  return recovered->then(defer(self(), &Self::_apply, std::move(operation)));
+  return undiscardable(recovered.future()).then(
+      defer(self(), &Self::_apply, std::move(operation)));
 }
 
 
@@ -249,8 +255,9 @@ void GenericRegistrarProcess::update()
 
   updating = true;
 
-  CHECK_SOME(registry);
-  Registry updatedRegistry = registry.get();
+  CHECK_SOME(variable);
+
+  Registry updatedRegistry = variable->get();
 
   foreach (Owned<Registrar::Operation>& operation, operations) {
     Try<bool> operationResult = (*operation)(&updatedRegistry);
@@ -272,7 +279,6 @@ void GenericRegistrarProcess::update()
       self(),
       &Self::_update,
       lambda::_1,
-      updatedRegistry,
       std::move(operations)));
 
   operations.clear();
@@ -281,7 +287,6 @@ void GenericRegistrarProcess::update()
 
 void GenericRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
-    const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
 {
   updating = false;
@@ -310,7 +315,6 @@ void GenericRegistrarProcess::_update(
   }
 
   variable = store->get();
-  registry = updatedRegistry;
 
   // Remove the operations.
   while (!applied.empty()) {
@@ -340,7 +344,7 @@ GenericRegistrar::~GenericRegistrar()
 }
 
 
-Future<Nothing> GenericRegistrar::recover()
+Future<Registry> GenericRegistrar::recover()
 {
   return dispatch(process.get(), &GenericRegistrarProcess::recover);
 }
@@ -364,6 +368,8 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
   public:
     AdaptedOperation(Owned<Registrar::Operation> operation);
 
+    Future<registry::Registry> recover();
+
   private:
     Try<bool> perform(internal::Registry* registry, hashset<SlaveID>*) override;
 
@@ -376,12 +382,17 @@ class MasterRegistrarProcess : public Process<MasterRegistrarProcess>
   };
 
 public:
-  explicit MasterRegistrarProcess(master::Registrar* registrar);
+  explicit MasterRegistrarProcess(
+      master::Registrar* registrar,
+      Registry registry);
 
   Future<bool> apply(Owned<Registrar::Operation> operation);
 
+  Future<registry::Registry> recover() { return registry; }
+
 private:
   master::Registrar* registrar = nullptr;
+  Registry registry;
 };
 
 
@@ -398,9 +409,12 @@ Try<bool> MasterRegistrarProcess::AdaptedOperation::perform(
 }
 
 
-MasterRegistrarProcess::MasterRegistrarProcess(master::Registrar* _registrar)
+MasterRegistrarProcess::MasterRegistrarProcess(
+    master::Registrar* _registrar,
+    registry::Registry _registry)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    registrar(_registrar) {}
+    registrar(_registrar),
+    registry(std::move(_registry)) {}
 
 
 Future<bool> MasterRegistrarProcess::apply(
@@ -413,8 +427,10 @@ Future<bool> MasterRegistrarProcess::apply(
 }
 
 
-MasterRegistrar::MasterRegistrar(master::Registrar* registrar)
-  : process(new MasterRegistrarProcess(registrar))
+MasterRegistrar::MasterRegistrar(
+    master::Registrar* registrar,
+    registry::Registry registry)
+  : process(new MasterRegistrarProcess(registrar, std::move(registry)))
 {
   spawn(process.get(), false);
 }
@@ -427,9 +443,9 @@ MasterRegistrar::~MasterRegistrar()
 }
 
 
-Future<Nothing> MasterRegistrar::recover()
+Future<Registry> MasterRegistrar::recover()
 {
-  return Nothing();
+  return dispatch(process.get(), &MasterRegistrarProcess::recover);
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 3c10785..ded56e1 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -71,12 +71,16 @@ public:
       process::Owned<state::Storage> storage);
 
   // Create a registry on top of a master's persistent state.
+  //
+  // The created registrar does not take ownership of the passed registrar
+  // which needs to be valid as long as the created registrar is alive.
   static Try<process::Owned<Registrar>> create(
-      mesos::internal::master::Registrar* registrar);
+      mesos::internal::master::Registrar* registrar,
+      registry::Registry registry);
 
   virtual ~Registrar() = default;
 
-  virtual process::Future<Nothing> recover() = 0;
+  virtual process::Future<registry::Registry> recover() = 0;
   virtual process::Future<bool> apply(process::Owned<Operation> operation) = 0;
 };
 
@@ -115,7 +119,7 @@ public:
 
   ~GenericRegistrar() override;
 
-  process::Future<Nothing> recover() override;
+  process::Future<registry::Registry> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
@@ -130,13 +134,17 @@ class MasterRegistrarProcess;
 class MasterRegistrar : public Registrar
 {
 public:
-  explicit MasterRegistrar(mesos::internal::master::Registrar* Registrar);
+  // The created registrar does not take ownership of the passed registrar
+  // which needs to be valid as long as the created registrar is alive.
+  explicit MasterRegistrar(
+      mesos::internal::master::Registrar* registrar,
+      registry::Registry registry);
 
   ~MasterRegistrar() override;
 
   // This registrar performs no recovery; instead to recover
   // the underlying master registrar needs to be recovered.
-  process::Future<Nothing> recover() override;
+  process::Future<registry::Registry> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/169efe6e/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 1664073..eb8e4fc 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -841,10 +841,6 @@ TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  // Applying operations on a not yet recovered registrar fails.
-  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
-      new AdmitResourceProvider(resourceProviderId))));
-
   AWAIT_READY(registrar.get()->recover());
 
   Future<bool> admitResourceProvider =
@@ -873,15 +869,16 @@ TEST_F(ResourceProviderRegistrarTest, MasterRegistrar)
 
   const MasterInfo masterInfo = protobuf::createMasterInfo({});
 
-  Try<Owned<Registrar>> registrar = Registrar::create(&masterRegistrar);
+  Future<Registry> registry = masterRegistrar.recover(masterInfo);
+  AWAIT_READY(registry);
+
+  Try<Owned<Registrar>> registrar = Registrar::create(
+      &masterRegistrar,
+      registry->resource_provider_registry());
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());
 
-  // Applying operations on a not yet recovered registrar fails.
-  AWAIT_FAILED(registrar.get()->apply(Owned<Registrar::Operation>(
-      new AdmitResourceProvider(resourceProviderId))));
-
   AWAIT_READY(masterRegistrar.recover(masterInfo));
 
   Future<bool> admitResourceProvider =


[3/9] mesos git commit: Renamed resource provider `AgentRegistrar` to `GenericRegistrar`.

Posted by ch...@apache.org.
Renamed resource provider `AgentRegistrar` to `GenericRegistrar`.

This registrar and its matching process can work on generic storage
and it is currently used to work on an agent's persist store.

Its name should not be tied to the agent, so we rename the registrar
in this patch.

Review: https://reviews.apache.org/r/66526/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/e286bbf2
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/e286bbf2
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/e286bbf2

Branch: refs/heads/master
Commit: e286bbf258fa6d5434f8c504a03abc862dd87f62
Parents: 3f0cd11
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:05 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:05 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/registrar.cpp           | 35 +++++++++++-----------
 src/resource_provider/registrar.hpp           | 10 +++----
 src/tests/resource_provider_manager_tests.cpp |  4 +--
 3 files changed, 25 insertions(+), 24 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/e286bbf2/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index dbb55dd..53b403e 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -88,7 +88,7 @@ bool Registrar::Operation::set()
 
 Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 {
-  return new AgentRegistrar(std::move(storage));
+  return new GenericRegistrar(std::move(storage));
 }
 
 
@@ -145,10 +145,10 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
 }
 
 
-class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
+class GenericRegistrarProcess : public Process<GenericRegistrarProcess>
 {
 public:
-  AgentRegistrarProcess(Owned<Storage> storage);
+  GenericRegistrarProcess(Owned<Storage> storage);
 
   Future<Nothing> recover();
 
@@ -182,13 +182,13 @@ private:
 };
 
 
-AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
-  : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
+GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
+  : ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
     storage(std::move(_storage)),
     state(storage.get()) {}
 
 
-Future<Nothing> AgentRegistrarProcess::recover()
+Future<Nothing> GenericRegistrarProcess::recover()
 {
   constexpr char NAME[] = "RESOURCE_PROVIDER_REGISTRAR";
 
@@ -206,7 +206,8 @@ Future<Nothing> AgentRegistrarProcess::recover()
 }
 
 
-Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
+Future<bool> GenericRegistrarProcess::apply(
+    Owned<Registrar::Operation> operation)
 {
   if (recovered.isNone()) {
     return Failure("Attempted to apply the operation before recovering");
@@ -216,7 +217,7 @@ Future<bool> AgentRegistrarProcess::apply(Owned<Registrar::Operation> operation)
 }
 
 
-Future<bool> AgentRegistrarProcess::_apply(
+Future<bool> GenericRegistrarProcess::_apply(
     Owned<Registrar::Operation> operation)
 {
   if (error.isSome()) {
@@ -234,7 +235,7 @@ Future<bool> AgentRegistrarProcess::_apply(
 }
 
 
-void AgentRegistrarProcess::update()
+void GenericRegistrarProcess::update()
 {
   CHECK(!updating);
   CHECK_NONE(error);
@@ -275,7 +276,7 @@ void AgentRegistrarProcess::update()
 }
 
 
-void AgentRegistrarProcess::_update(
+void GenericRegistrarProcess::_update(
     const Future<Option<Variable<Registry>>>& store,
     const Registry& updatedRegistry,
     deque<Owned<Registrar::Operation>> applied)
@@ -322,31 +323,31 @@ void AgentRegistrarProcess::_update(
 }
 
 
-AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
-  : process(new AgentRegistrarProcess(std::move(storage)))
+GenericRegistrar::GenericRegistrar(Owned<Storage> storage)
+  : process(new GenericRegistrarProcess(std::move(storage)))
 {
   process::spawn(process.get(), false);
 }
 
 
-AgentRegistrar::~AgentRegistrar()
+GenericRegistrar::~GenericRegistrar()
 {
   process::terminate(*process);
   process::wait(*process);
 }
 
 
-Future<Nothing> AgentRegistrar::recover()
+Future<Nothing> GenericRegistrar::recover()
 {
-  return dispatch(process.get(), &AgentRegistrarProcess::recover);
+  return dispatch(process.get(), &GenericRegistrarProcess::recover);
 }
 
 
-Future<bool> AgentRegistrar::apply(Owned<Operation> operation)
+Future<bool> GenericRegistrar::apply(Owned<Operation> operation)
 {
   return dispatch(
       process.get(),
-      &AgentRegistrarProcess::apply,
+      &GenericRegistrarProcess::apply,
       std::move(operation));
 }
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e286bbf2/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 34cb166..3c10785 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -105,22 +105,22 @@ private:
 };
 
 
-class AgentRegistrarProcess;
+class GenericRegistrarProcess;
 
 
-class AgentRegistrar : public Registrar
+class GenericRegistrar : public Registrar
 {
 public:
-  AgentRegistrar(process::Owned<state::Storage> storage);
+  GenericRegistrar(process::Owned<state::Storage> storage);
 
-  ~AgentRegistrar() override;
+  ~GenericRegistrar() override;
 
   process::Future<Nothing> recover() override;
 
   process::Future<bool> apply(process::Owned<Operation> operation) override;
 
 private:
-  std::unique_ptr<AgentRegistrarProcess> process;
+  std::unique_ptr<GenericRegistrarProcess> process;
 };
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/e286bbf2/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 72e8122..ceb7854 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -819,8 +819,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, AgentEndpoint)
 class ResourceProviderRegistrarTest : public tests::MesosTest {};
 
 
-// Test that the agent resource provider registrar works as expected.
-TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
+// Test that the generic resource provider registrar works as expected.
+TEST_F(ResourceProviderRegistrarTest, GenericRegistrar)
 {
   ResourceProviderID resourceProviderId;
   resourceProviderId.set_value("foo");


[2/9] mesos git commit: Externalized creation of resource provider manager backing storage.

Posted by ch...@apache.org.
Externalized creation of resource provider manager backing storage.

This patch changes the way the storage backing an agent's resource
provider registrar is created: while before we created it implicitly
when constructing the registrar, we now consume storage passed on
construction.

Being able to explicitly inject the used storage simplifies testing.

Review: https://reviews.apache.org/r/66309/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/3f0cd112
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/3f0cd112
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/3f0cd112

Branch: refs/heads/master
Commit: 3f0cd112cf9645ee28b366e6081069662c619e70
Parents: 6850353
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:03 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:03 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/registrar.cpp           | 49 ++++------------------
 src/resource_provider/registrar.hpp           | 15 ++++---
 src/tests/resource_provider_manager_tests.cpp | 27 +-----------
 3 files changed, 16 insertions(+), 75 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/3f0cd112/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 92ef9ae..dbb55dd 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -27,10 +27,6 @@
 
 #include <mesos/state/in_memory.hpp>
 
-#ifndef __WINDOWS__
-#include <mesos/state/leveldb.hpp>
-#endif // __WINDOWS__
-
 #include <mesos/state/protobuf.hpp>
 
 #include <process/defer.hpp>
@@ -53,12 +49,6 @@ using std::string;
 using mesos::resource_provider::registry::Registry;
 using mesos::resource_provider::registry::ResourceProvider;
 
-using mesos::state::InMemoryStorage;
-
-#ifndef __WINDOWS__
-using mesos::state::LevelDBStorage;
-#endif // __WINDOWS__
-
 using mesos::state::Storage;
 
 using mesos::state::protobuf::Variable;
@@ -96,11 +86,9 @@ bool Registrar::Operation::set()
 }
 
 
-Try<Owned<Registrar>> Registrar::create(
-    const slave::Flags& slaveFlags,
-    const SlaveID& slaveId)
+Try<Owned<Registrar>> Registrar::create(Owned<Storage> storage)
 {
-  return new AgentRegistrar(slaveFlags, slaveId);
+  return new AgentRegistrar(std::move(storage));
 }
 
 
@@ -160,7 +148,7 @@ Try<bool> RemoveResourceProvider::perform(Registry* registry)
 class AgentRegistrarProcess : public Process<AgentRegistrarProcess>
 {
 public:
-  AgentRegistrarProcess(const slave::Flags& flags, const SlaveID& slaveId);
+  AgentRegistrarProcess(Owned<Storage> storage);
 
   Future<Nothing> recover();
 
@@ -191,33 +179,12 @@ private:
   deque<Owned<Registrar::Operation>> operations;
 
   bool updating = false;
-
-  static Owned<Storage> createStorage(const std::string& path);
 };
 
 
-Owned<Storage> AgentRegistrarProcess::createStorage(const std::string& path)
-{
-  // The registrar uses LevelDB as underlying storage. Since LevelDB
-  // is currently not supported on Windows (see MESOS-5932), we fall
-  // back to in-memory storage there.
-  //
-  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
-#ifndef __WINDOWS__
-  return Owned<Storage>(new LevelDBStorage(path));
-#else
-  LOG(WARNING)
-    << "Persisting resource provider manager state is not supported on Windows";
-  return Owned<Storage>(new InMemoryStorage());
-#endif // __WINDOWS__
-}
-
-
-AgentRegistrarProcess::AgentRegistrarProcess(
-    const slave::Flags& flags, const SlaveID& slaveId)
+AgentRegistrarProcess::AgentRegistrarProcess(Owned<Storage> _storage)
   : ProcessBase(process::ID::generate("resource-provider-agent-registrar")),
-    storage(createStorage(slave::paths::getResourceProviderRegistryPath(
-        flags.work_dir, slaveId))),
+    storage(std::move(_storage)),
     state(storage.get()) {}
 
 
@@ -355,10 +322,8 @@ void AgentRegistrarProcess::_update(
 }
 
 
-AgentRegistrar::AgentRegistrar(
-    const slave::Flags& slaveFlags,
-    const SlaveID& slaveId)
-  : process(new AgentRegistrarProcess(slaveFlags, slaveId))
+AgentRegistrar::AgentRegistrar(Owned<Storage> storage)
+  : process(new AgentRegistrarProcess(std::move(storage)))
 {
   process::spawn(process.get(), false);
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f0cd112/src/resource_provider/registrar.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.hpp b/src/resource_provider/registrar.hpp
index 39f45b0..34cb166 100644
--- a/src/resource_provider/registrar.hpp
+++ b/src/resource_provider/registrar.hpp
@@ -19,6 +19,8 @@
 
 #include <memory>
 
+#include <mesos/state/storage.hpp>
+
 #include <process/future.hpp>
 #include <process/owned.hpp>
 
@@ -64,14 +66,13 @@ public:
     bool success = false;
   };
 
-  // Create a registry on top of a master's persistent state.
+  // Create a registry on top of generic storage.
   static Try<process::Owned<Registrar>> create(
-      mesos::internal::master::Registrar* registrar);
+      process::Owned<state::Storage> storage);
 
-  // Create a registry on top of an agent's persistent state.
+  // Create a registry on top of a master's persistent state.
   static Try<process::Owned<Registrar>> create(
-      const mesos::internal::slave::Flags& slaveFlags,
-      const SlaveID& slaveId);
+      mesos::internal::master::Registrar* registrar);
 
   virtual ~Registrar() = default;
 
@@ -110,9 +111,7 @@ class AgentRegistrarProcess;
 class AgentRegistrar : public Registrar
 {
 public:
-  AgentRegistrar(
-      const mesos::internal::slave::Flags& slaveFlags,
-      const SlaveID& slaveId);
+  AgentRegistrar(process::Owned<state::Storage> storage);
 
   ~AgentRegistrar() override;
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/3f0cd112/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0de4e79..72e8122 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -825,31 +825,8 @@ TEST_F(ResourceProviderRegistrarTest, AgentRegistrar)
   ResourceProviderID resourceProviderId;
   resourceProviderId.set_value("foo");
 
-  Try<Owned<cluster::Master>> master = StartMaster();
-  ASSERT_SOME(master);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  const slave::Flags flags = CreateSlaveFlags();
-
-  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
-    FUTURE_PROTOBUF(SlaveRegisteredMessage(), master.get()->pid, _);
-
-  Future<UpdateSlaveMessage> updateSlaveMessage =
-    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
-
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), flags);
-  ASSERT_SOME(slave);
-
-  AWAIT_READY(slaveRegisteredMessage);
-
-  // The agent will send `UpdateSlaveMessage` after it has created its
-  // meta directories. Await the message to make sure the agent
-  // registrar can create its store in the meta hierarchy.
-  AWAIT_READY(updateSlaveMessage);
-
-  Try<Owned<Registrar>> registrar =
-    Registrar::create(flags, slaveRegisteredMessage->slave_id());
+  Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
+  Try<Owned<Registrar>> registrar = Registrar::create(std::move(storage));
 
   ASSERT_SOME(registrar);
   ASSERT_NE(nullptr, registrar->get());


[8/9] mesos git commit: Prevent resubscription of resource providers with unknown IDs.

Posted by ch...@apache.org.
Prevent resubscription of resource providers with unknown IDs.

This patch adds a check to the resource provider manager's subscribe
functionality making sure that any ID sent by a resubscribing resource
provider corresponds to some previously known resource provider.

This not only serves as convenient validation of user-provided data,
but also makes sure that the internal state of the resource provider
manager remains consistent.

Review: https://reviews.apache.org/r/66546/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/d7e21259
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/d7e21259
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/d7e21259

Branch: refs/heads/master
Commit: d7e21259f107fde235c5482818702d873f2e12fb
Parents: 65ed45f
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:22 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:22 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 13 ++++-
 src/tests/resource_provider_manager_tests.cpp | 61 ++++++++++++++++++++++
 2 files changed, 73 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/d7e21259/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 5979480..aff1ca5 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -678,7 +678,18 @@ void ResourceProviderManagerProcess::subscribe(
     // restarted or an agent failover. The 'ResourceProviderInfo' might
     // have been updated, but its type and name should remain the same.
     // We should checkpoint its 'type', 'name' and ID, then check if the
-    // resubscribption is consistent with the checkpointed record.
+    // resubscription is consistent with the checkpointed record.
+
+    const ResourceProviderID& resourceProviderId = resourceProviderInfo.id();
+
+    if (!resourceProviders.known.contains(resourceProviderId)) {
+      LOG(INFO)
+        << "Dropping resubscription attempt of resource provider with ID "
+        << resourceProviderId
+        << " since it is unknown";
+
+      return;
+    }
 
     // If the resource provider is known we do not need to admit it
     // again, and the registrar operation implicitly succeeded.

http://git-wip-us.apache.org/repos/asf/mesos/blob/d7e21259/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 8c364a1..1ee4dc3 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -1150,6 +1150,67 @@ TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
 }
 
 
+// Test that when a resource provider attempts to resubscribe with an
+// unknown ID it is not admitted but disconnected.
+TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeUnknownID)
+{
+  Clock::pause();
+
+  // Start master and agent.
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // For the agent's resource provider manager to start,
+  // the agent needs to have been assigned an agent ID.
+  Future<SlaveRegisteredMessage> slaveRegisteredMessage =
+    FUTURE_PROTOBUF(SlaveRegisteredMessage(), _, _);
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(agent);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+
+  AWAIT_READY(slaveRegisteredMessage);
+
+  mesos::v1::ResourceProviderID resourceProviderId;
+  resourceProviderId.set_value(id::UUID::random().toString());
+
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.mutable_id()->CopyFrom(resourceProviderId);
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  Owned<v1::MockResourceProvider> resourceProvider(
+      new v1::MockResourceProvider(resourceProviderInfo));
+
+  // We explicitly reset the resource provider after the expected
+  // disconnect to prevent it from resubscribing indefinitely.
+  Future<Nothing> disconnected;
+  EXPECT_CALL(*resourceProvider, disconnected())
+    .WillOnce(DoAll(
+        Invoke([&resourceProvider]() { resourceProvider.reset(); }),
+        FutureSatisfy(&disconnected)));
+
+  // Start and register a resource provider.
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(agent.get()->pid));
+
+  const ContentType contentType = GetParam();
+
+  resourceProvider->start(
+      endpointDetector,
+      contentType,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(disconnected);
+}
+
+
 // This test verifies that a disconnected resource provider will
 // result in an `UpdateSlaveMessage` to be sent to the master and the
 // total resources of the disconnected resource provider will be


[4/9] mesos git commit: Passed on registrar when constructing resource provider manager.

Posted by ch...@apache.org.
Passed on registrar when constructing resource provider manager.

In order to support recovering resource provider manager information
in the future, we need to adjust the construction of the manager to be
able to consume a registrar.

This patch lays the groundwork by adjusting interfaces and their
usage; we will make use of the passed on information in a following
patch.

Review: https://reviews.apache.org/r/66310/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/2d81d7ce
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/2d81d7ce
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/2d81d7ce

Branch: refs/heads/master
Commit: 2d81d7ce72f68ddc981c0d3c28f4f4c1654dd516
Parents: e286bbf
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:08 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:08 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             | 15 +++++++----
 src/resource_provider/manager.hpp             |  5 +++-
 src/resource_provider/registrar.cpp           |  5 +++-
 src/slave/slave.cpp                           | 31 +++++++++++++++++++++-
 src/tests/resource_provider_manager_tests.cpp | 28 ++++++++++++-------
 5 files changed, 67 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/2d81d7ce/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 4128808..67dbfbe 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -58,6 +58,7 @@ using mesos::internal::resource_provider::validation::call::validate;
 
 using mesos::resource_provider::Call;
 using mesos::resource_provider::Event;
+using mesos::resource_provider::Registrar;
 
 using process::Failure;
 using process::Future;
@@ -157,7 +158,7 @@ class ResourceProviderManagerProcess
   : public Process<ResourceProviderManagerProcess>
 {
 public:
-  ResourceProviderManagerProcess();
+  ResourceProviderManagerProcess(Owned<Registrar> _registrar);
 
   Future<http::Response> api(
       const http::Request& request,
@@ -212,9 +213,13 @@ private:
 };
 
 
-ResourceProviderManagerProcess::ResourceProviderManagerProcess()
+ResourceProviderManagerProcess::ResourceProviderManagerProcess(
+    Owned<Registrar> _registrar)
   : ProcessBase(process::ID::generate("resource-provider-manager")),
-    metrics(*this) {}
+    metrics(*this)
+{
+  CHECK_NOTNULL(_registrar.get());
+}
 
 
 Future<http::Response> ResourceProviderManagerProcess::api(
@@ -763,8 +768,8 @@ ResourceProviderManagerProcess::Metrics::~Metrics()
 }
 
 
-ResourceProviderManager::ResourceProviderManager()
-  : process(new ResourceProviderManagerProcess())
+ResourceProviderManager::ResourceProviderManager(Owned<Registrar> registrar)
+  : process(new ResourceProviderManagerProcess(std::move(registrar)))
 {
   spawn(CHECK_NOTNULL(process.get()));
 }

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d81d7ce/src/resource_provider/manager.hpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.hpp b/src/resource_provider/manager.hpp
index bc017fa..6c57956 100644
--- a/src/resource_provider/manager.hpp
+++ b/src/resource_provider/manager.hpp
@@ -26,6 +26,7 @@
 #include "messages/messages.hpp"
 
 #include "resource_provider/message.hpp"
+#include "resource_provider/registrar.hpp"
 
 namespace mesos {
 namespace internal {
@@ -37,7 +38,9 @@ class ResourceProviderManagerProcess;
 class ResourceProviderManager
 {
 public:
-  ResourceProviderManager();
+  ResourceProviderManager(
+      process::Owned<resource_provider::Registrar> registrar);
+
   ~ResourceProviderManager();
 
   ResourceProviderManager(

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d81d7ce/src/resource_provider/registrar.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/registrar.cpp b/src/resource_provider/registrar.cpp
index 53b403e..b151e2b 100644
--- a/src/resource_provider/registrar.cpp
+++ b/src/resource_provider/registrar.cpp
@@ -185,7 +185,10 @@ private:
 GenericRegistrarProcess::GenericRegistrarProcess(Owned<Storage> _storage)
   : ProcessBase(process::ID::generate("resource-provider-generic-registrar")),
     storage(std::move(_storage)),
-    state(storage.get()) {}
+    state(storage.get())
+{
+  CHECK_NOTNULL(storage.get());
+}
 
 
 Future<Nothing> GenericRegistrarProcess::recover()

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d81d7ce/src/slave/slave.cpp
----------------------------------------------------------------------
diff --git a/src/slave/slave.cpp b/src/slave/slave.cpp
index d313777..6ca3d79 100644
--- a/src/slave/slave.cpp
+++ b/src/slave/slave.cpp
@@ -38,6 +38,9 @@
 
 #include <mesos/module/authenticatee.hpp>
 
+#include <mesos/state/leveldb.hpp>
+#include <mesos/state/in_memory.hpp>
+
 #include <mesos/resource_provider/storage/disk_profile_adaptor.hpp>
 
 #include <process/after.hpp>
@@ -943,6 +946,10 @@ void Slave::finalize()
       shutdownFramework(UPID(), frameworkId);
     }
   }
+
+  // Explicitly tear down the resource provider manager to ensure that the
+  // wrapped process is terminated and releases the underlying storage.
+  resourceProviderManager.reset();
 }
 
 
@@ -8802,7 +8809,29 @@ void Slave::initializeResourceProviderManager(
     return;
   }
 
-  resourceProviderManager.reset(new ResourceProviderManager());
+  // The registrar uses LevelDB as underlying storage. Since LevelDB
+  // is currently not supported on Windows (see MESOS-5932), we fall
+  // back to in-memory storage there.
+  //
+  // TODO(bbannier): Remove this Windows workaround once MESOS-5932 is fixed.
+#ifndef __WINDOWS__
+  Owned<mesos::state::Storage> storage(new mesos::state::LevelDBStorage(
+      paths::getResourceProviderRegistryPath(flags.work_dir, slaveId)));
+#else
+  LOG(WARNING)
+    << "Persisting resource provider manager state is not supported on Windows";
+  Owned<mesos::state::Storage> storage(new mesos::state::InMemoryStorage());
+#endif // __WINDOWS__
+
+  Try<Owned<resource_provider::Registrar>> resourceProviderRegistrar =
+    resource_provider::Registrar::create(std::move(storage));
+
+  CHECK_SOME(resourceProviderRegistrar)
+    << "Could not construct resource provider registrar: "
+    << resourceProviderRegistrar.error();
+
+  resourceProviderManager.reset(
+      new ResourceProviderManager(std::move(resourceProviderRegistrar.get())));
 
   if (capabilities.resourceProvider) {
     // Start listening for messages from the resource provider manager.

http://git-wip-us.apache.org/repos/asf/mesos/blob/2d81d7ce/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index ceb7854..1664073 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -67,6 +67,7 @@ using mesos::master::detector::MasterDetector;
 
 using mesos::state::InMemoryStorage;
 using mesos::state::State;
+using mesos::state::Storage;
 
 using mesos::resource_provider::AdmitResourceProvider;
 using mesos::resource_provider::Registrar;
@@ -129,7 +130,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, NoContentType)
   request.method = "POST";
   request.headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -154,7 +156,8 @@ TEST_F(ResourceProviderManagerHttpApiTest, ValidJsonButInvalidProtobuf)
   request.headers["Content-Type"] = APPLICATION_JSON;
   request.body = stringify(object);
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -177,7 +180,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, MalformedContent)
   request.headers["Content-Type"] = stringify(contentType);
   request.body = "MALFORMED_CONTENT";
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -223,7 +227,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, UnsupportedContentMediaType)
   request.headers["Content-Type"] = unknownMediaType;
   request.body = serialize(contentType, call);
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Future<http::Response> response = manager.api(request, None());
 
@@ -235,7 +240,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateState)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -342,7 +348,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, UpdateOperationStatus)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -460,7 +467,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesSuccess)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -567,7 +575,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesFailure)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<id::UUID> streamId;
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
@@ -674,7 +683,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, PublishResourcesDisconnected)
 {
   const ContentType contentType = GetParam();
 
-  ResourceProviderManager manager;
+  ResourceProviderManager manager(
+      Registrar::create(Owned<Storage>(new InMemoryStorage)).get());
 
   Option<mesos::v1::ResourceProviderID> resourceProviderId;
   Option<http::Pipe::Reader> reader;


[6/9] mesos git commit: Remembered recovered and subscribed providers in ephemeral state.

Posted by ch...@apache.org.
Remembered recovered and subscribed providers in ephemeral state.

This patch adds a data structure to bookkeep subscribed and recovered
resource providers in the ephemeral state of the resource provider
manager.

Review: https://reviews.apache.org/r/66544/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9a8cef07
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9a8cef07
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9a8cef07

Branch: refs/heads/master
Commit: 9a8cef0739446925a0719abf51758ea7443c8990
Parents: 169efe6
Author: Benjamin Bannier <be...@mesosphere.io>
Authored: Tue May 1 13:09:17 2018 -0700
Committer: Chun-Hung Hsiao <ch...@mesosphere.io>
Committed: Tue May 1 13:09:17 2018 -0700

----------------------------------------------------------------------
 src/resource_provider/manager.cpp | 22 ++++++++++++++++++++++
 1 file changed, 22 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9a8cef07/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index dfb8e73..bc52741 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -208,6 +208,10 @@ private:
   struct ResourceProviders
   {
     hashmap<ResourceProviderID, Owned<ResourceProvider>> subscribed;
+    hashmap<
+        ResourceProviderID,
+        mesos::resource_provider::registry::ResourceProvider>
+      known;
   } resourceProviders;
 
   struct Metrics
@@ -253,6 +257,13 @@ void ResourceProviderManagerProcess::initialize()
 Future<Nothing> ResourceProviderManagerProcess::recover(
     const mesos::resource_provider::registry::Registry& registry)
 {
+  foreach (
+      const mesos::resource_provider::registry::ResourceProvider&
+        resourceProvider,
+      registry.resource_providers()) {
+    resourceProviders.known.put(resourceProvider.id(), resourceProvider);
+  }
+
   recovered.set(Nothing());
 
   return Nothing();
@@ -697,6 +708,17 @@ void ResourceProviderManagerProcess::subscribe(
   resourceProviders.subscribed.put(
       resourceProviderId,
       std::move(resourceProvider));
+
+  if (!resourceProviders.known.contains(resourceProviderId)) {
+    mesos::resource_provider::registry::ResourceProvider resourceProvider_;
+    resourceProvider_.mutable_id()->CopyFrom(resourceProviderId);
+
+    resourceProviders.known.put(
+        resourceProviderId,
+        std::move(resourceProvider_));
+
+    // TODO(bbannier): Persist this information in the registry.
+  }
 }