You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by bb...@apache.org on 2018/02/19 14:19:15 UTC

[4/4] mesos git commit: Tested correct operation handling during master failover.

Tested correct operation handling during master failover.

Review: https://reviews.apache.org/r/65045/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/8c9184a0
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/8c9184a0
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/8c9184a0

Branch: refs/heads/master
Commit: 8c9184a03fa6b6fe842eb3554220d3ed2c327cdc
Parents: c6de89e
Author: Jan Schlicht <ja...@mesosphere.io>
Authored: Mon Feb 19 15:15:55 2018 +0100
Committer: Benjamin Bannier <bb...@apache.org>
Committed: Mon Feb 19 15:15:55 2018 +0100

----------------------------------------------------------------------
 src/tests/master_tests.cpp | 212 ++++++++++++++++++++++++++++++++++++++++
 1 file changed, 212 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/8c9184a0/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index 28663c7..3705fa7 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8743,6 +8743,218 @@ TEST_F(MasterTest, UpdateSlaveMessageWithPendingOffers)
 }
 
 
+// Tests that the master correctly handles resource provider operations
+// that finished during a master failover.
+TEST_F(MasterTest, OperationUpdateDuringFailover)
+{
+  Clock::pause();
+
+  master::Flags masterFlags = CreateMasterFlags();
+
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // TODO(nfnt): Remove this once 'MockResourceProvider' supports
+  // authentication.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  // Set the resource provider capability.
+  vector<SlaveInfo::Capability> capabilities = slave::AGENT_CAPABILITIES();
+  SlaveInfo::Capability capability;
+  capability.set_type(SlaveInfo::Capability::RESOURCE_PROVIDER);
+  capabilities.push_back(capability);
+
+  slaveFlags.agent_features = SlaveCapabilities();
+  slaveFlags.agent_features->mutable_capabilities()->CopyFrom(
+      {capabilities.begin(), capabilities.end()});
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  StandaloneMasterDetector detector(master.get()->pid);
+  Try<Owned<cluster::Slave>> slave = StartSlave(&detector, slaveFlags);
+  ASSERT_SOME(slave);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Register a resource provider with the agent.
+  mesos::v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.resource_provider.test");
+  resourceProviderInfo.set_name("test");
+
+  v1::Resources resourceProviderResources = v1::createDiskResource(
+      "200",
+      "*",
+      None(),
+      None(),
+      v1::createDiskSourceRaw());
+
+  v1::MockResourceProvider resourceProvider(
+      resourceProviderInfo,
+      resourceProviderResources);
+
+  Owned<EndpointDetector> endpointDetector(
+      resource_provider::createEndpointDetector(slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  resourceProvider.start(
+      endpointDetector,
+      ContentType::PROTOBUF,
+      v1::DEFAULT_CREDENTIAL);
+
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start a framework to operate on offers.
+  MockScheduler sched;
+  TestingMesosSchedulerDriver driver(&sched, &detector);
+
+  // Expect a registration as well as a re-registration after master
+  // failover.
+  EXPECT_CALL(sched, registered(&driver, _, _))
+    .Times(2);
+
+  Future<vector<Offer>> offers;
+  EXPECT_CALL(sched, resourceOffers(&driver, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(Return()); // Ignore subsequent offers.
+
+  driver.start();
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->empty());
+  const Offer& offer = offers->front();
+
+  Option<Resource> rawDisk;
+
+  foreach (const Resource& resource, offer.resources()) {
+    if (resource.has_provider_id() &&
+        resource.has_disk() &&
+        resource.disk().has_source() &&
+        resource.disk().source().type() == Resource::DiskInfo::Source::RAW) {
+      rawDisk = resource;
+      break;
+    }
+  }
+
+  ASSERT_SOME(rawDisk);
+
+  Future<mesos::v1::resource_provider::Event::ApplyOperation> operation;
+  EXPECT_CALL(resourceProvider, applyOperation(_))
+    .WillOnce(FutureArg<0>(&operation));
+
+  driver.acceptOffers(
+      {offer.id()},
+      {CREATE_VOLUME(rawDisk.get(), Resource::DiskInfo::Source::MOUNT)});
+
+  AWAIT_READY(operation);
+
+  Option<mesos::v1::UUID> operationUUID;
+
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_OPERATIONS);
+
+    process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(ContentType::PROTOBUF);
+
+    Future<Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        headers,
+        serialize(ContentType::PROTOBUF, call),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+    Try<v1::master::Response> response_ =
+      deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body);
+
+    ASSERT_SOME(response_);
+    const v1::master::Response::GetOperations& operations =
+      response_->get_operations();
+
+    ASSERT_EQ(1, operations.operations_size());
+    EXPECT_EQ(
+        mesos::v1::OperationState::OPERATION_PENDING,
+        operations.operations(0).latest_status().state());
+
+    operationUUID = operations.operations(0).uuid();
+  }
+
+  CHECK_SOME(operationUUID);
+
+  EXPECT_CALL(sched, disconnected(&driver));
+
+  // Drop the operation update for the finished operation.
+  // As we fail over the master immediately afterwards, we expect
+  // that the operation update will be part of the agent's
+  // `UPDATE_STATE` message when re-registering with the master.
+  Future<UpdateOperationStatusMessage> updateOperationStatusMessage =
+    DROP_PROTOBUF(UpdateOperationStatusMessage(), _, _);
+
+  // Finish the pending operation.
+  resourceProvider.operationDefault(operation.get());
+
+  AWAIT_READY(updateOperationStatusMessage);
+
+  // Fail over the master.
+  master->reset();
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  EXPECT_CALL(sched, offerRescinded(&driver, _))
+    .WillRepeatedly(Return());
+
+  // Start a new master and have agent and framework reconnect.
+  // The reconnected agent should report the converted resources.
+  master = StartMaster(masterFlags);
+  detector.appoint(master.get()->pid);
+
+  Clock::advance(slaveFlags.registration_backoff_factor);
+  Clock::settle();
+
+  AWAIT_READY(updateSlaveMessage);
+
+  {
+    v1::master::Call call;
+    call.set_type(v1::master::Call::GET_OPERATIONS);
+
+    process::http::Headers headers = createBasicAuthHeaders(DEFAULT_CREDENTIAL);
+    headers["Accept"] = stringify(ContentType::PROTOBUF);
+
+    Future<Response> response = process::http::post(
+        master.get()->pid,
+        "api/v1",
+        headers,
+        serialize(ContentType::PROTOBUF, call),
+        stringify(ContentType::PROTOBUF));
+
+    AWAIT_EXPECT_RESPONSE_STATUS_EQ(OK().status, response);
+
+    Try<v1::master::Response> response_ =
+      deserialize<v1::master::Response>(ContentType::PROTOBUF, response->body);
+
+    ASSERT_SOME(response_);
+    const v1::master::Response::GetOperations& operations =
+      response_->get_operations();
+
+    ASSERT_EQ(1, operations.operations_size());
+    EXPECT_EQ(
+        mesos::v1::OperationState::OPERATION_FINISHED,
+        operations.operations(0).latest_status().state());
+    EXPECT_EQ(operationUUID.get(), operations.operations(0).uuid());
+  }
+
+  driver.stop();
+  driver.join();
+}
+
+
 class MasterTestPrePostReservationRefinement
   : public MasterTest,
     public WithParamInterface<bool> {