You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/01/18 10:30:31 UTC
[2/2] mesos git commit: Added a resource provider test case.
Added a resource provider test case.
Review: https://reviews.apache.org/r/65126/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8cf6b088
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8cf6b088
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8cf6b088
Branch: refs/heads/master
Commit: 8cf6b0882e8e393702c673e2b29ac6781cec0b85
Parents: 75659c9
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Wed Jan 17 15:23:02 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Thu Jan 18 11:28:12 2018 +0100
----------------------------------------------------------------------
src/tests/master_tests.cpp | 128 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 128 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8cf6b088/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 0edf224..d01f3fb 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8608,6 +8608,134 @@ TEST_F_TEMP_DISABLED_ON_WINDOWS(MasterTest, RegistryGcByCount)
}
+// This test verifies that updating a resource provider's state
+// that isn't motivated by (re-)registration (e.g. when adding
+// resources) is correctly handled by agent and master: Offers are
+// rescinded and new resources are offered.
+TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers)
+{
+ Clock::pause();
+
+ // Start master and agent.
+ master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Owned<MasterDetector> detector = master.get()->createDetector();
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ Try<Owned<cluster::Slave>> agent = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(agent);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+ Clock::settle();
+ AWAIT_READY(updateSlaveMessage);
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ v1::Resource disk1 = v1::createDiskResource(
+ "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+ Owned<v1::MockResourceProvider> resourceProvider(
+ new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk1)));
+
+ // Start and register a resource provider with a single disk resources.
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(agent.get()->pid));
+
+ resourceProvider->start(
+ endpointDetector, ContentType::PROTOBUF, v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(updateSlaveMessage);
+ ASSERT_TRUE(resourceProvider->info.has_id());
+
+ disk1.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+
+ // Start and register a framework.
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(v1::DEFAULT_FRAMEWORK_INFO));
+
+ EXPECT_CALL(*scheduler, subscribed(_, _));
+
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return()); // Ignore heartbeats.
+
+ Future<v1::scheduler::Event::Offers> offers;
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ v1::Resources offeredResources = offers->offers(0).resources();
+ offeredResources.unallocate();
+
+ EXPECT_TRUE(offeredResources.contains(disk1));
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ // Add another resource to the resource provider. By sending
+ // 'UPDATE_STATE', the resource provider manager will be notified
+ // of the new resource.
+ v1::Resource disk2 = v1::createDiskResource(
+ "100", "*", None(), None(), v1::createDiskSourceBlock());
+ disk2.mutable_provider_id()->CopyFrom(resourceProvider->info.id());
+
+ v1::resource_provider::Call call;
+ call.mutable_resource_provider_id()->CopyFrom(resourceProvider->info.id());
+ call.set_type(v1::resource_provider::Call::UPDATE_STATE);
+
+ v1::resource_provider::Call::UpdateState* updateState =
+ call.mutable_update_state();
+ updateState->add_resources()->CopyFrom(disk1);
+ updateState->add_resources()->CopyFrom(disk2);
+ updateState->mutable_resource_version_uuid()->set_value(
+ id::UUID::random().toBytes());
+
+ EXPECT_CALL(*scheduler, rescind(_, _));
+
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers));
+
+ resourceProvider->send(call);
+
+ AWAIT_READY(updateSlaveMessage);
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ offeredResources = offers->offers(0).resources();
+ offeredResources.unallocate();
+
+ EXPECT_TRUE(offeredResources.contains(disk1));
+ EXPECT_TRUE(offeredResources.contains(disk2));
+}
+
+
class MasterTestPrePostReservationRefinement
: public MasterTest,
public WithParamInterface<bool> {