You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/02/19 14:19:15 UTC
[4/4] mesos git commit: Tested correct operation handling during
master failover.
Tested correct operation handling during master failover.
Review: https://reviews.apache.org/r/65045/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c9184a0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c9184a0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c9184a0
Branch: refs/heads/master
Commit: 8c9184a03fa6b6fe842eb3554220d3ed2c327cdc
Parents: c6de89e
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Feb 19 15:15:55 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Mon Feb 19 15:15:55 2018 +0100
----------------------------------------------------------------------
src/tests/master_tests.cpp | 212 ++++++++++++++++++++++++++++++++++++++++
1 file changed, 212 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/8c9184a0/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 28663c7..3705fa7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8743,6 +8743,218 @@ TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers)
}
+// Tests that the master correctly handles resource provider operations
+// that finished during a master failover.
+TEST_F(MasterTest, OperationUpdateDuringFailover)
+{
+ Clock::pause();
+
+ master::Flags masterFlags = CreateMasterFlags();
+
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ slave::Flags slaveFlags = CreateSlaveFlags();
+
+ // TODO(nfnt): Remove this once 'MockResourceProvider' supports
+ // authentication.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ // Set the resource provider capability.
+ vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+ SlaveInfo::Capability capability;
+ capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+ capabilities.push_back(capability);
+
+ slaveFlags.agent_features = SlaveCapabilities();
+ slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+ {capabilities.begin(), capabilities.end()});
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ StandaloneMasterDetector detector(master.get()->pid);
+ Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+ ASSERT_SOME(slave);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ AWAIT_READY(updateSlaveMessage);
+
+ // Register a resource provider with the agent.
+ mesos::v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+ resourceProviderInfo.set_name("test");
+
+ v1::Resources resourceProviderResources = v1::createDiskResource(
+ "200",
+ "*",
+ None(),
+ None(),
+ v1::createDiskSourceRaw());
+
+ v1::MockResourceProvider resourceProvider(
+ resourceProviderInfo,
+ resourceProviderResources);
+
+ Owned<EndpointDetector> endpointDetector(
+ resource_provider::createEndpointDetector(slave.get()->pid));
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ resourceProvider.start(
+ endpointDetector,
+ ContentType::PROTOBUF,
+ v1::DEFAULT_CREDENTIAL);
+
+ AWAIT_READY(updateSlaveMessage);
+
+ // Start a framework to operate on offers.
+ MockScheduler sched;
+ TestingMesosSchedulerDriver driver(&sched, &detector);
+
+ // Expect a registration as well as a re-registration after master
+ // failover.
+ EXPECT_CALL(sched, registered(&driver, _, _))
+ .Times(2);
+
+ Future<vector<Offer>> offers;
+ EXPECT_CALL(sched, resourceOffers(&driver, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+ driver.start();
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->empty());
+ const Offer& offer = offers->front();
+
+ Option<Resource> rawDisk;
+
+ foreach (const Resource& resource, offer.resources()) {
+ if (resource.has_provider_id() &&
+ resource.has_disk() &&
+ resource.disk().has_source() &&
+ resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+ rawDisk = resource;
+ break;
+ }
+ }
+
+ ASSERT_SOME(rawDisk);
+
+ Future<mesos::v1::resource_provider::Event::ApplyOperation> operation;
+ EXPECT_CALL(resourceProvider, applyOperation(_))
+ .WillOnce(FutureArg<0>(&operation));
+
+ driver.acceptOffers(
+ {offer.id()},
+ {CREATE_VOLUME(rawDisk.get(), Resource::DiskInfo::Source::MOUNT)});
+
+ AWAIT_READY(operation);
+
+ Option<mesos::v1::UUID> operationUUID;
+
+ {
+ v1::master::Call call;
+ call.set_type(v1::master::Call::GET_OPERATIONS);
+
+ process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(ContentType::PROTOBUF);
+
+ Future<Response> response = process::http::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(ContentType::PROTOBUF, call),
+ stringify(ContentType::PROTOBUF));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ Try<v1::master::Response> response_ =
+ deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body);
+
+ ASSERT_SOME(response_);
+ const v1::master::Response::GetOperations& operations =
+ response_->get_operations();
+
+ ASSERT_EQ(1, operations.operations_size());
+ EXPECT_EQ(
+ mesos::v1::OperationState::OPERATION_PENDING,
+ operations.operations(0).latest_status().state());
+
+ operationUUID = operations.operations(0).uuid();
+ }
+
+ CHECK_SOME(operationUUID);
+
+ EXPECT_CALL(sched, disconnected(&driver));
+
+ // Drop the operation update for the finished operation.
+ // As we fail over the master immediately afterwards, we expect
+ // that the operation update will be part of the agent's
+ // `UPDATE_STATE` message when re-registering with the master.
+ Future<UpdateOperationStatusMessage> updateOperationStatusMessage =
+ DROP_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+ // Finish the pending operation.
+ resourceProvider.operationDefault(operation.get());
+
+ AWAIT_READY(updateOperationStatusMessage);
+
+ // Fail over the master.
+ master->reset();
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ EXPECT_CALL(sched, offerRescinded(&driver, _))
+ .WillRepeatedly(Return());
+
+ // Start a new master and have agent and framework reconnect.
+ // The reconnected agent should report the converted resources.
+ master = StartMaster(masterFlags);
+ detector.appoint(master.get()->pid);
+
+ Clock::advance(slaveFlags.registration_backoff_factor);
+ Clock::settle();
+
+ AWAIT_READY(updateSlaveMessage);
+
+ {
+ v1::master::Call call;
+ call.set_type(v1::master::Call::GET_OPERATIONS);
+
+ process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+ headers["Accept"] = stringify(ContentType::PROTOBUF);
+
+ Future<Response> response = process::http::post(
+ master.get()->pid,
+ "api/v1",
+ headers,
+ serialize(ContentType::PROTOBUF, call),
+ stringify(ContentType::PROTOBUF));
+
+ AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+ Try<v1::master::Response> response_ =
+ deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body);
+
+ ASSERT_SOME(response_);
+ const v1::master::Response::GetOperations& operations =
+ response_->get_operations();
+
+ ASSERT_EQ(1, operations.operations_size());
+ EXPECT_EQ(
+ mesos::v1::OperationState::OPERATION_FINISHED,
+ operations.operations(0).latest_status().state());
+ EXPECT_EQ(operationUUID.get(), operations.operations(0).uuid());
+ }
+
+ driver.stop();
+ driver.join();
+}
+
+
class MasterTestPrePostReservationRefinement
: public MasterTest,
public WithParamInterface<bool> {