You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2017/12/01 11:12:10 UTC
[2/2] mesos git commit: Allowed resubscription of resource providers.
Allowed resubscription of resource providers.
A resource provider can resubscribe by including the resource provider
ID it got assigned as part of its 'ResourceProviderInfo' in a
'SUBSCRIBE' call. A resubscription is necessary if either the resource
provider or the resource provider manager (i.e. an agent) failed over.
Review: https://reviews.apache.org/r/64065/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9ab950d3
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9ab950d3
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9ab950d3
Branch: refs/heads/master
Commit: 9ab950d38cedd40c90ffc81305cae372f1d80e87
Parents: 1c92058
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Fri Dec 1 10:03:36 2017 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Fri Dec 1 12:01:57 2017 +0100
----------------------------------------------------------------------
src/resource_provider/manager.cpp | 47 +++---
src/tests/mesos.hpp | 32 +++--
src/tests/resource_provider_manager_tests.cpp | 158 ++++++++++++++++++---
src/tests/slave_tests.cpp | 29 ++--
4 files changed, 205 insertions(+), 61 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/resource_provider/manager.cpp
----------------------------------------------------------------------
diff --git a/src/resource_provider/manager.cpp b/src/resource_provider/manager.cpp
index 5fdce7f..c37553d 100644
--- a/src/resource_provider/manager.cpp
+++ b/src/resource_provider/manager.cpp
@@ -385,34 +385,43 @@ void ResourceProviderManagerProcess::subscribe(
LOG(INFO) << "Subscribing resource provider " << resourceProviderInfo;
+ ResourceProvider resourceProvider(resourceProviderInfo, http);
+
if (!resourceProviderInfo.has_id()) {
// The resource provider is subscribing for the first time.
- resourceProviderInfo.mutable_id()->CopyFrom(newResourceProviderId());
-
- ResourceProvider resourceProvider(resourceProviderInfo, http);
-
- Event event;
- event.set_type(Event::SUBSCRIBED);
- event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
- resourceProvider.info.id());
-
- if (!resourceProvider.http.send(event)) {
- LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
- << resourceProvider.info.id() << ": connection closed";
- }
+ resourceProvider.info.mutable_id()->CopyFrom(newResourceProviderId());
// TODO(jieyu): Start heartbeat for the resource provider.
resourceProviders.subscribed.put(
- resourceProviderInfo.id(),
+ resourceProvider.info.id(),
resourceProvider);
-
- return;
+ } else {
+ if (resourceProviders.subscribed.contains(resourceProviderInfo.id())) {
+ // Resource provider is resubscribing after failing over.
+ // TODO(nfnt): Test if old and new 'ResourceProviderInfo' match.
+ ResourceProvider& _resourceProvider =
+ resourceProviders.subscribed.at(resourceProviderInfo.id());
+
+ _resourceProvider.http.close();
+ _resourceProvider.http = http;
+ } else {
+ // Resource provider is resubscribing after an agent failover.
+ resourceProviders.subscribed.put(
+ resourceProviderInfo.id(), resourceProvider);
+ }
}
- // TODO(chhsiao): Reject the subscription if it contains an unknown
- // ID or there is already a subscribed instance with the same ID,
- // and add tests for re-subscriptions.
+ Event event;
+ event.set_type(Event::SUBSCRIBED);
+ event.mutable_subscribed()->mutable_provider_id()->CopyFrom(
+ resourceProvider.info.id());
+
+ if (!resourceProviders.subscribed.at(resourceProvider.info.id()).http.send(
+ event)) {
+ LOG(WARNING) << "Failed to send SUBSCRIBED event to resource provider "
+ << resourceProvider.info.id() << ": connection closed";
+ }
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 99542c5..aa2571f 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -2778,6 +2778,7 @@ template <
typename Event,
typename Call,
typename Driver,
+ typename ResourceProviderInfo,
typename Resource,
typename Resources,
typename ResourceProviderID,
@@ -2787,8 +2788,11 @@ template <
class MockResourceProvider
{
public:
- MockResourceProvider(const Option<Resources>& _resources = None())
- : resources(_resources)
+ MockResourceProvider(
+ const ResourceProviderInfo& _info,
+ const Option<Resources>& _resources = None())
+ : info(_info),
+ resources(_resources)
{
ON_CALL(*this, connected())
.WillByDefault(Invoke(
@@ -2797,6 +2801,7 @@ public:
Event,
Call,
Driver,
+ ResourceProviderInfo,
Resource,
Resources,
ResourceProviderID,
@@ -2812,6 +2817,7 @@ public:
Event,
Call,
Driver,
+ ResourceProviderInfo,
Resource,
Resources,
ResourceProviderID,
@@ -2827,6 +2833,7 @@ public:
Event,
Call,
Driver,
+ ResourceProviderInfo,
Resource,
Resources,
ResourceProviderID,
@@ -2884,6 +2891,7 @@ public:
Event,
Call,
Driver,
+ ResourceProviderInfo,
Resource,
Resources,
ResourceProviderID,
@@ -2896,6 +2904,7 @@ public:
Event,
Call,
Driver,
+ ResourceProviderInfo,
Resource,
Resources,
ResourceProviderID,
@@ -2908,6 +2917,7 @@ public:
Event,
Call,
Driver,
+ ResourceProviderInfo,
Resource,
Resources,
ResourceProviderID,
@@ -2923,29 +2933,26 @@ public:
{
Call call;
call.set_type(Call::SUBSCRIBE);
- call.mutable_subscribe()->mutable_resource_provider_info()->set_type(
- "org.apache.mesos.rp.test");
- call.mutable_subscribe()->mutable_resource_provider_info()->set_name(
- "test");
+ call.mutable_subscribe()->mutable_resource_provider_info()->CopyFrom(info);
driver->send(call);
}
void subscribedDefault(const typename Event::Subscribed& subscribed)
{
- resourceProviderId = subscribed.provider_id();
+ info.mutable_id()->CopyFrom(subscribed.provider_id());
if (resources.isSome()) {
Resources injected;
foreach (Resource resource, resources.get()) {
- resource.mutable_provider_id()->CopyFrom(resourceProviderId.get());
+ resource.mutable_provider_id()->CopyFrom(info.id());
injected += resource;
}
Call call;
call.set_type(Call::UPDATE_STATE);
- call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+ call.mutable_resource_provider_id()->CopyFrom(info.id());
typename Call::UpdateState* update = call.mutable_update_state();
update->mutable_resources()->CopyFrom(injected);
@@ -2957,9 +2964,11 @@ public:
void operationDefault(const typename Event::Operation& operation)
{
+ CHECK(info.has_id());
+
Call call;
call.set_type(Call::UPDATE_OFFER_OPERATION_STATUS);
- call.mutable_resource_provider_id()->CopyFrom(resourceProviderId.get());
+ call.mutable_resource_provider_id()->CopyFrom(info.id());
typename Call::UpdateOfferOperationStatus* update =
call.mutable_update_offer_operation_status();
@@ -3030,7 +3039,7 @@ public:
driver->send(call);
}
- Option<ResourceProviderID> resourceProviderId;
+ ResourceProviderInfo info;
private:
Option<Resources> resources;
@@ -3054,6 +3063,7 @@ using MockResourceProvider = tests::resource_provider::MockResourceProvider<
mesos::v1::resource_provider::Event,
mesos::v1::resource_provider::Call,
mesos::v1::resource_provider::Driver,
+ mesos::v1::ResourceProviderInfo,
mesos::v1::Resource,
mesos::v1::Resources,
mesos::v1::ResourceProviderID,
http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/resource_provider_manager_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/resource_provider_manager_tests.cpp b/src/tests/resource_provider_manager_tests.cpp
index 0b7c4ad..5764102 100644
--- a/src/tests/resource_provider_manager_tests.cpp
+++ b/src/tests/resource_provider_manager_tests.cpp
@@ -101,7 +101,31 @@ namespace tests {
class ResourceProviderManagerHttpApiTest
: public MesosTest,
- public WithParamInterface<ContentType> {};
+ public WithParamInterface<ContentType>
+{
+public:
+ slave::Flags CreateSlaveFlags() override
+ {
+ slave::Flags slaveFlags = MesosTest::CreateSlaveFlags();
+
+ slaveFlags.authenticate_http_readwrite = false;
+
+ constexpr SlaveInfo::Capability::Type capabilities[] = {
+ SlaveInfo::Capability::MULTI_ROLE,
+ SlaveInfo::Capability::HIERARCHICAL_ROLE,
+ SlaveInfo::Capability::RESERVATION_REFINEMENT,
+ SlaveInfo::Capability::RESOURCE_PROVIDER};
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ foreach (SlaveInfo::Capability::Type type, capabilities) {
+ SlaveInfo::Capability* capability =
+ slaveFlags.agent_features->add_capabilities();
+ capability->set_type(type);
+ }
+
+ return slaveFlags;
+ }
+};
// The tests are parameterized by the content type of the request.
@@ -596,26 +620,9 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
Owned<MasterDetector> detector = master.get()->createDetector();
- slave::Flags slaveFlags = CreateSlaveFlags();
- slaveFlags.authenticate_http_readwrite = false;
-
Future<UpdateSlaveMessage> updateSlaveMessage =
FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
- // Set the resource provider capability and other required capabilities.
- constexpr SlaveInfo::Capability::Type capabilities[] = {
- SlaveInfo::Capability::MULTI_ROLE,
- SlaveInfo::Capability::HIERARCHICAL_ROLE,
- SlaveInfo::Capability::RESERVATION_REFINEMENT,
- SlaveInfo::Capability::RESOURCE_PROVIDER};
-
- slaveFlags.agent_features = SlaveCapabilities();
- foreach (SlaveInfo::Capability::Type type, capabilities) {
- SlaveInfo::Capability* capability =
- slaveFlags.agent_features->add_capabilities();
- capability->set_type(type);
- }
-
// Pause the clock and control it manually in order to
// control the timing of the registration. A registration timeout
// would trigger multiple registration attempts. As a result, multiple
@@ -624,6 +631,8 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
// resource provider registration.
Clock::pause();
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(agent);
@@ -638,7 +647,12 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
Clock::resume();
- v1::MockResourceProvider resourceProvider(Some(v1::Resources(disk)));
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ v1::MockResourceProvider resourceProvider(
+ resourceProviderInfo, Some(v1::Resources(disk)));
// Start and register a resource provider.
string scheme = "http";
@@ -757,6 +771,112 @@ TEST_P(ResourceProviderManagerHttpApiTest, ConvertResources)
EXPECT_FALSE(block->reservations().empty());
}
+
+// Test that resource provider can resubscribe with an agent after
+// a resource provider failover as well as an agent failover.
+TEST_P(ResourceProviderManagerHttpApiTest, ResubscribeResourceProvider)
+{
+ Clock::pause();
+
+ // Start master and agent.
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(agent);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+ Clock::settle();
+ AWAIT_READY(updateSlaveMessage);
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ v1::Resource disk = v1::createDiskResource(
+ "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+ v1::MockResourceProvider resourceProvider(
+ resourceProviderInfo, Some(v1::Resources(disk)));
+
+ // Start and register a resource provider.
+ string scheme = "http";
+
+#ifdef USE_SSL_SOCKET
+ if (process::network::openssl::flags().enabled) {
+ scheme = "https";
+ }
+#endif
+
+ http::URL url(
+ scheme,
+ agent.get()->pid.address.ip,
+ agent.get()->pid.address.port,
+ agent.get()->pid.id + "/api/v1/resource_provider");
+
+ Owned<EndpointDetector> endpointDetector(new ConstantEndpointDetector(url));
+
+ const ContentType contentType = GetParam();
+
+ resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+ // Wait until the agent's resources have been updated to include the
+ // resource provider resources. At this point the resource provider
+ // will have an ID assigned by the agent.
+ AWAIT_READY(updateSlaveMessage);
+
+ mesos::v1::ResourceProviderID resourceProviderId = resourceProvider.info.id();
+
+ Future<Event::Subscribed> subscribed1;
+ EXPECT_CALL(resourceProvider, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribed1));
+
+ // Resource provider failover by opening a new connection.
+ // The assigned resource provider ID will be used to resubscribe.
+ resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(subscribed1);
+ EXPECT_EQ(resourceProviderId, subscribed1->provider_id());
+
+ Future<Event::Subscribed> subscribed2;
+ EXPECT_CALL(resourceProvider, subscribed(_))
+ .WillOnce(FutureArg<0>(&subscribed2));
+
+ Future<Nothing> __recover = FUTURE_DISPATCH(_, &Slave::__recover);
+
+ // The agent failover.
+ agent->reset();
+ agent = StartSlave(detector.get(), slaveFlags);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+ Clock::settle();
+
+ AWAIT_READY(__recover);
+
+ url = http::URL(
+ scheme,
+ agent.get()->pid.address.ip,
+ agent.get()->pid.address.port,
+ agent.get()->pid.id + "/api/v1/resource_provider");
+
+ endpointDetector.reset(new ConstantEndpointDetector(url));
+
+ resourceProvider.start(endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(subscribed2);
+ EXPECT_EQ(resourceProviderId, subscribed2->provider_id());
+}
+
} // namespace tests {
} // namespace internal {
} // namespace mesos {
http://git-wip-us.apache.org/repos/asf/mesos/blob/9ab950d3/src/tests/slave_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/slave_tests.cpp b/src/tests/slave_tests.cpp
index 1344e0a..8ab63ac 100644
--- a/src/tests/slave_tests.cpp
+++ b/src/tests/slave_tests.cpp
@@ -8655,8 +8655,12 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
Clock::advance(slaveFlags.registration_backoff_factor);
AWAIT_READY(slaveRegisteredMessage);
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+ resourceProviderInfo.set_name("test");
+
// Register a local resource provider with the agent.
- v1::MockResourceProvider resourceProvider;
+ v1::MockResourceProvider resourceProvider(resourceProviderInfo);
Future<Nothing> connected;
EXPECT_CALL(resourceProvider, connected())
@@ -8696,11 +8700,8 @@ TEST_F(SlaveTest, ResourceProviderSubscribe)
mesos::v1::resource_provider::Call call;
call.set_type(mesos::v1::resource_provider::Call::SUBSCRIBE);
- mesos::v1::ResourceProviderInfo* info =
- call.mutable_subscribe()->mutable_resource_provider_info();
-
- info->set_type("org.apache.mesos.resource_provider.test");
- info->set_name("test");
+ call.mutable_subscribe()->mutable_resource_provider_info()->CopyFrom(
+ resourceProviderInfo);
resourceProvider.send(call);
}
@@ -8864,6 +8865,10 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
AWAIT_READY(updateSlaveMessage);
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+ resourceProviderInfo.set_name("test");
+
// Register a resource provider with the agent.
v1::Resources resourceProviderResources = v1::createDiskResource(
"200",
@@ -8872,7 +8877,9 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
None(),
v1::createDiskSourceRaw());
- v1::MockResourceProvider resourceProvider(resourceProviderResources);
+ v1::MockResourceProvider resourceProvider(
+ resourceProviderInfo,
+ resourceProviderResources);
string scheme = "http";
@@ -8980,12 +8987,11 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
// Fail the operation in the resource provider. This should trigger
// an `UpdateSlaveMessage` to the master.
{
- CHECK_SOME(resourceProvider.resourceProviderId);
+ ASSERT_TRUE(resourceProvider.info.has_id());
v1::Resources resourceProviderResources_;
foreach (v1::Resource resource, resourceProviderResources) {
- resource.mutable_provider_id()->CopyFrom(
- resourceProvider.resourceProviderId.get());
+ resource.mutable_provider_id()->CopyFrom(resourceProvider.info.id());
resourceProviderResources_ += resource;
}
@@ -8996,8 +9002,7 @@ TEST_F(SlaveTest, ResourceProviderReconciliation)
v1::resource_provider::Call call;
call.set_type(v1::resource_provider::Call::UPDATE_STATE);
- call.mutable_resource_provider_id()->CopyFrom(
- resourceProvider.resourceProviderId.get());
+ call.mutable_resource_provider_id()->CopyFrom(resourceProvider.info.id());
v1::resource_provider::Call::UpdateState* updateState =
call.mutable_update_state();