You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/12/01 11:12:10 UTC

[2/2] mesos git commit: Allowed resubscription of resource providers.

Allowed resubscription of resource providers.

A resource provider can resubscribe by including the resource provider
ID it got assigned as part of its 'ResourceProviderInfo' in a
'SUBSCRIBE' call. A resubscription is necessary if either the resource
provider or the resource provider manager (i.e. an agent) failed over.

Review: https://reviews.apache.org/r/64065/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ab950d3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ab950d3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ab950d3

Branch: refs/heads/master
Commit: 9ab950d38cedd40c90ffc81305cae372f1d80e87
Parents: 1c92058
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Dec 1 10:03:36 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 1 12:01:57 2017 +0100

----------------------------------------------------------------------
 src/resource_provider/manager.cpp             |  47 +++---
 src/tests/mesos.hpp                           |  32 +++--
 src/tests/resource_provider_manager_tests.cpp | 158 ++++++++++++++++++---
 src/tests/slave_tests.cpp                     |  29 ++--
 4 files changed, 205 insertions(+), 61 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 5fdce7f..c37553d 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -385,34 +385,43 @@ void ResourceProviderManagerProcess::subscribe(
 
   LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo;
 
+  ResourceProvider resourceProvider(resourceProviderInfo, http);
+
   if (!resourceProviderInfo.has_id()) {
     // The resource provider is subscribing for the first time.
-    resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
-
-    ResourceProvider resourceProvider(resourceProviderInfo, http);
-
-    Event event;
-    event.set_type(Event::SUBSCRIBED);
-    event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
-        resourceProvider.info.id());
-
-    if (!resourceProvider.http.send(event)) {
-      LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
-                   << resourceProvider.info.id() << ": connection closed";
-    }
+    resourceProvider.info.mutable_id()->CopyFrom(newResourceProviderId());
 
     // TODO(jieyu): Start heartbeat for the resource provider.
 
     resourceProviders.subscribed.put(
-        resourceProviderInfo.id(),
+        resourceProvider.info.id(),
         resourceProvider);
-
-    return;
+  } else {
+    if (resourceProviders.subscribed.contains(resourceProviderInfo.id())) {
+      // Resource provider is resubscribing after failing over.
+      // TODO(nfnt): Test if old and new 'ResourceProviderInfo' match.
+      ResourceProvider& _resourceProvider =
+        resourceProviders.subscribed.at(resourceProviderInfo.id());
+
+      _resourceProvider.http.close();
+      _resourceProvider.http = http;
+    } else {
+      // Resource provider is resubscribing after an agent failover.
+      resourceProviders.subscribed.put(
+          resourceProviderInfo.id(), resourceProvider);
+    }
   }
 
-  // TODO(chhsiao): Reject the subscription if it contains an unknown
-  // ID or there is already a subscribed instance with the same ID,
-  // and add tests for re-subscriptions.
+  Event event;
+  event.set_type(Event::SUBSCRIBED);
+  event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+      resourceProvider.info.id());
+
+  if (!resourceProviders.subscribed.at(resourceProvider.info.id()).http.send(
+          event)) {
+    LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
+                 << resourceProvider.info.id() << ": connection closed";
+  }
 }
 
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 99542c5..aa2571f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2778,6 +2778,7 @@ template <
     typename Event,
     typename Call,
     typename Driver,
+    typename ResourceProviderInfo,
     typename Resource,
     typename Resources,
     typename ResourceProviderID,
@@ -2787,8 +2788,11 @@ template <
 class MockResourceProvider
 {
 public:
-  MockResourceProvider(const Option<Resources>& _resources = None())
-    : resources(_resources)
+  MockResourceProvider(
+      const ResourceProviderInfo& _info,
+      const Option<Resources>& _resources = None())
+    : info(_info),
+      resources(_resources)
   {
     ON_CALL(*this, connected())
       .WillByDefault(Invoke(
@@ -2797,6 +2801,7 @@ public:
               Event,
               Call,
               Driver,
+              ResourceProviderInfo,
               Resource,
               Resources,
               ResourceProviderID,
@@ -2812,6 +2817,7 @@ public:
               Event,
               Call,
               Driver,
+              ResourceProviderInfo,
               Resource,
               Resources,
               ResourceProviderID,
@@ -2827,6 +2833,7 @@ public:
               Event,
               Call,
               Driver,
+              ResourceProviderInfo,
               Resource,
               Resources,
               ResourceProviderID,
@@ -2884,6 +2891,7 @@ public:
                 Event,
                 Call,
                 Driver,
+                ResourceProviderInfo,
                 Resource,
                 Resources,
                 ResourceProviderID,
@@ -2896,6 +2904,7 @@ public:
                 Event,
                 Call,
                 Driver,
+                ResourceProviderInfo,
                 Resource,
                 Resources,
                 ResourceProviderID,
@@ -2908,6 +2917,7 @@ public:
                 Event,
                 Call,
                 Driver,
+                ResourceProviderInfo,
                 Resource,
                 Resources,
                 ResourceProviderID,
@@ -2923,29 +2933,26 @@ public:
   {
     Call call;
     call.set_type(Call::SUBSCRIBE);
-    call.mutable_subscribe()->mutable_resource_provider_info()->set_type(
-        "org.apache.mesos.rp.test");
-    call.mutable_subscribe()->mutable_resource_provider_info()->set_name(
-        "test");
+    call.mutable_subscribe()->mutable_resource_provider_info()->CopyFrom(info);
 
     driver->send(call);
   }
 
   void subscribedDefault(const typename Event::Subscribed& subscribed)
   {
-    resourceProviderId = subscribed.provider_id();
+    info.mutable_id()->CopyFrom(subscribed.provider_id());
 
     if (resources.isSome()) {
       Resources injected;
 
       foreach (Resource resource, resources.get()) {
-        resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+        resource.mutable_provider_id()->CopyFrom(info.id());
         injected += resource;
       }
 
       Call call;
       call.set_type(Call::UPDATE_STATE);
-      call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+      call.mutable_resource_provider_id()->CopyFrom(info.id());
 
       typename Call::UpdateState* update = call.mutable_update_state();
       update->mutable_resources()->CopyFrom(injected);
@@ -2957,9 +2964,11 @@ public:
 
   void operationDefault(const typename Event::Operation& operation)
   {
+    CHECK(info.has_id());
+
     Call call;
     call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
-    call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+    call.mutable_resource_provider_id()->CopyFrom(info.id());
 
     typename Call::UpdateOfferOperationStatus* update =
       call.mutable_update_offer_operation_status();
@@ -3030,7 +3039,7 @@ public:
     driver->send(call);
   }
 
-  Option<ResourceProviderID> resourceProviderId;
+  ResourceProviderInfo info;
 
 private:
   Option<Resources> resources;
@@ -3054,6 +3063,7 @@ using MockResourceProvider = tests::resource_provider::MockResourceProvider<
     mesos::v1::resource_provider::Event,
     mesos::v1::resource_provider::Call,
     mesos::v1::resource_provider::Driver,
+    mesos::v1::ResourceProviderInfo,
     mesos::v1::Resource,
     mesos::v1::Resources,
     mesos::v1::ResourceProviderID,

http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0b7c4ad..5764102 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -101,7 +101,31 @@ namespace tests {
 
 class ResourceProviderManagerHttpApiTest
   : public MesosTest,
-    public WithParamInterface<ContentType> {};
+    public WithParamInterface<ContentType>
+{
+public:
+  slave::Flags CreateSlaveFlags() override
+  {
+    slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
+
+    slaveFlags.authenticate_http_readwrite = false;
+
+    constexpr SlaveInfo::Capability::Type capabilities[] = {
+      SlaveInfo::Capability::MULTI_ROLE,
+      SlaveInfo::Capability::HIERARCHICAL_ROLE,
+      SlaveInfo::Capability::RESERVATION_REFINEMENT,
+      SlaveInfo::Capability::RESOURCE_PROVIDER};
+
+    slaveFlags.agent_features = SlaveCapabilities();
+    foreach (SlaveInfo::Capability::Type type, capabilities) {
+      SlaveInfo::Capability* capability =
+        slaveFlags.agent_features->add_capabilities();
+      capability->set_type(type);
+    }
+
+    return slaveFlags;
+  }
+};
 
 
 // The tests are parameterized by the content type of the request.
@@ -596,26 +620,9 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
 
   Owned<MasterDetector> detector = master.get()->createDetector();
 
-  slave::Flags slaveFlags = CreateSlaveFlags();
-  slaveFlags.authenticate_http_readwrite = false;
-
   Future<UpdateSlaveMessage> updateSlaveMessage =
     FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
 
-  // Set the resource provider capability and other required capabilities.
-  constexpr SlaveInfo::Capability::Type capabilities[] = {
-    SlaveInfo::Capability::MULTI_ROLE,
-    SlaveInfo::Capability::HIERARCHICAL_ROLE,
-    SlaveInfo::Capability::RESERVATION_REFINEMENT,
-    SlaveInfo::Capability::RESOURCE_PROVIDER};
-
-  slaveFlags.agent_features = SlaveCapabilities();
-  foreach (SlaveInfo::Capability::Type type, capabilities) {
-    SlaveInfo::Capability* capability =
-      slaveFlags.agent_features->add_capabilities();
-    capability->set_type(type);
-  }
-
   // Pause the clock and control it manually in order to
   // control the timing of the registration. A registration timeout
   // would trigger multiple registration attempts. As a result, multiple
@@ -624,6 +631,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
   // resource provider registration.
   Clock::pause();
 
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
   Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(agent);
 
@@ -638,7 +647,12 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
 
   Clock::resume();
 
-  v1::MockResourceProvider resourceProvider(Some(v1::Resources(disk)));
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  v1::MockResourceProvider resourceProvider(
+      resourceProviderInfo, Some(v1::Resources(disk)));
 
   // Start and register a resource provider.
   string scheme = "http";
@@ -757,6 +771,112 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
   EXPECT_FALSE(block->reservations().empty());
 }
 
+
+// Test that resource provider can resubscribe with an agent after
+// a resource provider failover as well as an agent failover.
+TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
+{
+  Clock::pause();
+
+  // Start master and agent.
+  master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Owned<MasterDetector> detector = master.get()->createDetector();
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(agent);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+  AWAIT_READY(updateSlaveMessage);
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  v1::Resource disk = v1::createDiskResource(
+      "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+  v1::MockResourceProvider resourceProvider(
+      resourceProviderInfo, Some(v1::Resources(disk)));
+
+  // Start and register a resource provider.
+  string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+  if (process::network::openssl::flags().enabled) {
+    scheme = "https";
+  }
+#endif
+
+  http::URL url(
+      scheme,
+      agent.get()->pid.address.ip,
+      agent.get()->pid.address.port,
+      agent.get()->pid.id + "/api/v1/resource_provider");
+
+  Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+  const ContentType contentType = GetParam();
+
+  resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources. At this point the resource provider
+  // will have an ID assigned by the agent.
+  AWAIT_READY(updateSlaveMessage);
+
+  mesos::v1::ResourceProviderID resourceProviderId = resourceProvider.info.id();
+
+  Future<Event::Subscribed> subscribed1;
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed1));
+
+  // Resource provider failover by opening a new connection.
+  // The assigned resource provider ID will be used to resubscribe.
+  resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(subscribed1);
+  EXPECT_EQ(resourceProviderId, subscribed1->provider_id());
+
+  Future<Event::Subscribed> subscribed2;
+  EXPECT_CALL(resourceProvider, subscribed(_))
+    .WillOnce(FutureArg<0>(&subscribed2));
+
+  Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+
+  // The agent failover.
+  agent->reset();
+  agent = StartSlave(detector.get(), slaveFlags);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+
+  AWAIT_READY(__recover);
+
+  url = http::URL(
+      scheme,
+      agent.get()->pid.address.ip,
+      agent.get()->pid.address.port,
+      agent.get()->pid.id + "/api/v1/resource_provider");
+
+  endpointDetector.reset(new ConstantEndpointDetector(url));
+
+  resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(subscribed2);
+  EXPECT_EQ(resourceProviderId, subscribed2->provider_id());
+}
+
 } // namespace tests {
 } // namespace internal {
 } // namespace mesos {

http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 1344e0a..8ab63ac 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8655,8 +8655,12 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
   Clock::advance(slaveFlags.registration_backoff_factor);
   AWAIT_READY(slaveRegisteredMessage);
 
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+  resourceProviderInfo.set_name("test");
+
   // Register a local resource provider with the agent.
-  v1::MockResourceProvider resourceProvider;
+  v1::MockResourceProvider resourceProvider(resourceProviderInfo);
 
   Future<Nothing> connected;
   EXPECT_CALL(resourceProvider, connected())
@@ -8696,11 +8700,8 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
     mesos::v1::resource_provider::Call call;
     call.set_type(mesos::v1::resource_provider::Call::SUBSCRIBE);
 
-    mesos::v1::ResourceProviderInfo* info =
-      call.mutable_subscribe()->mutable_resource_provider_info();
-
-    info->set_type("org.apache.mesos.resource_provider.test");
-    info->set_name("test");
+    call.mutable_subscribe()->mutable_resource_provider_info()->CopyFrom(
+        resourceProviderInfo);
 
     resourceProvider.send(call);
   }
@@ -8864,6 +8865,10 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
 
   AWAIT_READY(updateSlaveMessage);
 
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+  resourceProviderInfo.set_name("test");
+
   // Register a resource provider with the agent.
   v1::Resources resourceProviderResources = v1::createDiskResource(
       "200",
@@ -8872,7 +8877,9 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
       None(),
       v1::createDiskSourceRaw());
 
-  v1::MockResourceProvider resourceProvider(resourceProviderResources);
+  v1::MockResourceProvider resourceProvider(
+      resourceProviderInfo,
+      resourceProviderResources);
 
   string scheme = "http";
 
@@ -8980,12 +8987,11 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
   // Fail the operation in the resource provider. This should trigger
   // an `UpdateSlaveMessage` to the master.
   {
-    CHECK_SOME(resourceProvider.resourceProviderId);
+    ASSERT_TRUE(resourceProvider.info.has_id());
 
     v1::Resources resourceProviderResources_;
     foreach (v1::Resource resource, resourceProviderResources) {
-      resource.mutable_provider_id()->CopyFrom(
-          resourceProvider.resourceProviderId.get());
+      resource.mutable_provider_id()->CopyFrom(resourceProvider.info.id());
 
       resourceProviderResources_ += resource;
     }
@@ -8996,8 +9002,7 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
     v1::resource_provider::Call call;
 
     call.set_type(v1::resource_provider::Call::UPDATE_STATE);
-    call.mutable_resource_provider_id()->CopyFrom(
-        resourceProvider.resourceProviderId.get());
+    call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id());
 
     v1::resource_provider::Call::UpdateState* updateState =
       call.mutable_update_state();