You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/05/08 01:18:30 UTC
[2/5] mesos git commit: Made the master drop operations with an ID on
agent default resources.
Made the master drop operations with an ID on agent default resources.
Review: https://reviews.apache.org/r/66992/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4c541b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4c541b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4c541b4
Branch: refs/heads/master
Commit: b4c541b4d9677e2b84d8538f319a3dfe7987e327
Parents: 39b27e1
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:32:15 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:14:45 2018 -0700
----------------------------------------------------------------------
src/master/master.cpp | 8 +++
src/tests/master_tests.cpp | 75 +++++++++++++++++++++++
src/tests/operation_reconciliation_tests.cpp | 69 ++++++++++++++++++---
3 files changed, 144 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b5d2eb..28a0661 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4235,6 +4235,14 @@ void Master::accept(
break;
}
+ if (getResourceProviderId(operation).isNone()) {
+ drop(framework,
+ operation,
+ "Operation requested feedback, but it affects resources not"
+ " managed by a resource provider");
+ break;
+ }
+
if (!slave->capabilities.resourceProvider) {
drop(framework,
operation,
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index e159573..0f6042c 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8954,6 +8954,81 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
}
+// Tests that the master correctly drops an operation if the operation's 'id'
+// field is set and the operation affects resources not managed by a resource
+// provider.
+TEST_F(MasterTest, DropOperationWithIDAffectingDefaultResources)
+{
+ Clock::pause();
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ // Start a v1 framework.
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ // Ignore heartbeats.
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return());
+
+ Future<v1::scheduler::Event::Offers> offers;
+
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+
+ Future<v1::scheduler::Event::UpdateOperationStatus> operationErrorUpdate;
+ EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+ .WillOnce(FutureArg<1>(&operationErrorUpdate));
+
+ v1::Resource reserved = *(offer.resources().begin());
+ reserved.add_reservations()->CopyFrom(
+ v1::createDynamicReservationInfo(
+ frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+ v1::OperationID operationId;
+ operationId.set_value("operation");
+
+ mesos.send(v1::createCallAccept(
+ frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+
+ // Wait for the framework to receive the OPERATION_ERROR update.
+ AWAIT_READY(operationErrorUpdate);
+
+ EXPECT_EQ(operationId, operationErrorUpdate->status().operation_id());
+ EXPECT_EQ(v1::OPERATION_ERROR, operationErrorUpdate->status().state());
+}
+
+
class MasterTestPrePostReservationRefinement
: public MasterTest,
public WithParamInterface<bool> {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/operation_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
index 9717e84..39cf188 100644
--- a/src/tests/operation_reconciliation_tests.cpp
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -74,14 +74,60 @@ TEST_P(OperationReconciliationTest, PendingOperation)
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
Owned<MasterDetector> detector = master.get()->createDetector();
mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
// Advance the clock to trigger agent registration.
Clock::advance(slaveFlags.registration_backoff_factor);
+ // Wait for the agent to register.
+ AWAIT_READY(updateSlaveMessage);
+
+ // Start and register a resource provider.
+
+ ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ Resource disk =
+ createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
+
+ Owned<MockResourceProvider> resourceProvider(
+ new MockResourceProvider(
+ resourceProviderInfo,
+ Resources(disk)));
+
+ Owned<EndpointDetector> endpointDetector(
+ mesos::internal::tests::resource_provider::createEndpointDetector(
+ slave.get()->pid));
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // fully register.
+ Clock::resume();
+
+ ContentType contentType = GetParam();
+
+ resourceProvider->start(endpointDetector, contentType, DEFAULT_CREDENTIAL);
+
+ // Wait until the agent's resources have been updated to include the
+ // resource provider resources.
+ AWAIT_READY(updateSlaveMessage);
+ ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+ ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+
+ Clock::pause();
+
auto scheduler = std::make_shared<MockHTTPScheduler>();
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
@@ -114,18 +160,25 @@ TEST_P(OperationReconciliationTest, PendingOperation)
const Offer& offer = offers->offers(0);
const AgentID& agentId = offer.agent_id();
- OperationID operationId;
- operationId.set_value("operation");
-
- const Resources reservedResources =
- Resources(offer.resources())
- .pushReservation(createDynamicReservationInfo(
- frameworkInfo.roles(0), frameworkInfo.principal()));
-
// We'll drop the `ApplyOperationMessage` from the master to the agent.
Future<ApplyOperationMessage> applyOperationMessage =
DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
+ Resources resources =
+ Resources(offer.resources()).filter([](const Resource& resource) {
+ return resource.has_provider_id();
+ });
+
+ ASSERT_FALSE(resources.empty());
+
+ Resource reservedResources = *(resources.begin());
+ reservedResources.add_reservations()->CopyFrom(
+ createDynamicReservationInfo(
+ frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+ OperationID operationId;
+ operationId.set_value("operation");
+
mesos.send(createCallAccept(
frameworkId,
offer,