You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/05/08 01:18:29 UTC
[1/5] mesos git commit: Made the 'SchedulerDriver' abort when
operation's 'id' field is set.
Repository: mesos
Updated Branches:
refs/heads/master e6298aef8 -> 52ae7f0e6
Made the 'SchedulerDriver' abort when operation's 'id' field is set.
Since the 'SchedulerDriver' does not support operation status updates,
this patch adds a check to the driver which will abort the scheduler
if the 'id' field is set in an offer operation.
Review: https://reviews.apache.org/r/66938/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/39b27e1b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/39b27e1b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/39b27e1b
Branch: refs/heads/master
Commit: 39b27e1bb90aab3f10c1203d8f4f65de4f32e774
Parents: e6298ae
Author: Greg Mann <gr...@mesosphere.io>
Authored: Mon May 7 17:31:55 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 17:31:55 2018 -0700
----------------------------------------------------------------------
src/sched/sched.cpp | 6 ++++
src/tests/scheduler_tests.cpp | 63 --------------------------------------
2 files changed, 6 insertions(+), 63 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/39b27e1b/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 620a3b2..baa6b0c 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1342,6 +1342,12 @@ protected:
// Setting accept.operations.
foreach (const Offer::Operation& _operation, operations) {
+ if (_operation.has_id()) {
+ ABORT("An offer operation's 'id' field was set, which is disallowed"
+ " because the SchedulerDriver cannot handle offer operation"
+ " status updates");
+ }
+
Offer::Operation* operation = accept->add_operations();
operation->CopyFrom(_operation);
}
http://git-wip-us.apache.org/repos/asf/mesos/blob/39b27e1b/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 749420a..5474907 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -1193,69 +1193,6 @@ TEST_P(SchedulerTest, OperationFeedbackValidationNoResourceProviderCapability)
}
-// Verifies invalidation of RESERVE operations with `id` set, when sent by a
-// `SchedulerDriver` framework.
-TEST_P(SchedulerTest, OperationFeedbackValidationSchedulerDriverFramework)
-{
- Clock::pause();
-
- master::Flags masterFlags = CreateMasterFlags();
- Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
- ASSERT_SOME(master);
-
- Owned<MasterDetector> detector = master.get()->createDetector();
-
- Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
- ASSERT_SOME(slave);
-
- FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
- frameworkInfo.add_roles("framework-role");
-
- MockScheduler scheduler;
- MesosSchedulerDriver driver(
- &scheduler,
- frameworkInfo,
- master.get()->pid,
- DEFAULT_CREDENTIAL);
-
- Future<FrameworkID> frameworkId;
- EXPECT_CALL(scheduler, registered(&driver, _, _))
- .WillOnce(FutureArg<1>(&frameworkId));
-
- Future<vector<Offer>> offers;
- EXPECT_CALL(scheduler, resourceOffers(&driver, _))
- .WillOnce(FutureArg<1>(&offers));
-
- driver.start();
-
- AWAIT_READY(frameworkId);
-
- Clock::advance(masterFlags.allocation_interval);
- Clock::settle();
-
- AWAIT_READY(offers);
- ASSERT_FALSE(offers->empty());
-
- Future<Nothing> schedulerError;
- EXPECT_CALL(scheduler, error(_, _))
- .WillOnce(FutureSatisfy(&schedulerError));
-
- const Offer& offer = offers->at(0);
-
- Resources resources = Resources::parse("cpus:0.1").get();
- resources.pushReservation(createDynamicReservationInfo(
- frameworkInfo.roles(1),
- frameworkInfo.principal()));
-
- Offer::Operation operation = RESERVE(resources);
- operation.mutable_id()->set_value("RESERVE_OPERATION");
-
- driver.acceptOffers({offer.id()}, {operation});
-
- AWAIT_READY(schedulerError);
-}
-
-
TEST_P(SchedulerTest, ShutdownExecutor)
{
Try<Owned<cluster::Master>> master = StartMaster();
[4/5] mesos git commit: Prevented master from sending operation
updates to v0 frameworks.
Posted by gr...@apache.org.
Prevented master from sending operation updates to v0 frameworks.
Review: https://reviews.apache.org/r/66995/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d897259
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d897259
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d897259
Branch: refs/heads/master
Commit: 9d897259a39dc9f90e8fad191732a3fe45d63458
Parents: a570f94
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:33:32 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:15:30 2018 -0700
----------------------------------------------------------------------
src/master/master.cpp | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/9d897259/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f48a4f7..41862db 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2279,7 +2279,11 @@ void Master::drop(
<< " operation from framework " << *framework
<< ": " << message;
- if (operation.has_id()) {
+ // NOTE: The operation validation code should be refactored. Due to the order
+ // of validation, it's possible that this function will be called before the
+ // master validates that operations from v0 frameworks should not have their
+ // ID set.
+ if (operation.has_id() && framework->http.isSome()) {
scheduler::Event update;
update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
[5/5] mesos git commit: Improved validation messages for some
operations.
Posted by gr...@apache.org.
Improved validation messages for some operations.
Review: https://reviews.apache.org/r/66939/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/52ae7f0e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/52ae7f0e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/52ae7f0e
Branch: refs/heads/master
Commit: 52ae7f0e6dd6952d243c37e8b8aa98ce7752a17d
Parents: 9d89725
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:33:56 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:15:31 2018 -0700
----------------------------------------------------------------------
src/master/validation.cpp | 8 ++++----
src/tests/master_validation_tests.cpp | 8 ++++----
2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/52ae7f0e/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 0c1c924..798fc79 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2507,7 +2507,7 @@ Option<Error> validate(const Offer::Operation::CreateVolume& createVolume)
}
if (!Resources::hasResourceProvider(source)) {
- return Error("Does not have a resource provider");
+ return Error("'source' is not managed by a resource provider");
}
if (!Resources::isDisk(source, Resource::DiskInfo::Source::RAW)) {
@@ -2533,7 +2533,7 @@ Option<Error> validate(const Offer::Operation::DestroyVolume& destroyVolume)
}
if (!Resources::hasResourceProvider(volume)) {
- return Error("Does not have a resource provider");
+ return Error("'volume' is not managed by a resource provider");
}
if (!Resources::isDisk(volume, Resource::DiskInfo::Source::MOUNT) &&
@@ -2555,7 +2555,7 @@ Option<Error> validate(const Offer::Operation::CreateBlock& createBlock)
}
if (!Resources::hasResourceProvider(source)) {
- return Error("Does not have a resource provider");
+ return Error("'source' is not managed by a resource provider");
}
if (!Resources::isDisk(source, Resource::DiskInfo::Source::RAW)) {
@@ -2576,7 +2576,7 @@ Option<Error> validate(const Offer::Operation::DestroyBlock& destroyBlock)
}
if (!Resources::hasResourceProvider(block)) {
- return Error("Does not have a resource provider");
+ return Error("'block' is not managed by a resource provider");
}
if (!Resources::isDisk(block, Resource::DiskInfo::Source::BLOCK)) {
http://git-wip-us.apache.org/repos/asf/mesos/blob/52ae7f0e/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index fb1d8bd..6f2a78e 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1795,7 +1795,7 @@ TEST(OperationValidationTest, CreateVolume)
ASSERT_SOME(error);
EXPECT_TRUE(strings::contains(
error->message,
- "Does not have a resource provider"));
+ "'source' is not managed by a resource provider"));
createVolume.mutable_source()->CopyFrom(disk1);
createVolume.set_target_type(Resource::DiskInfo::Source::BLOCK);
@@ -1834,7 +1834,7 @@ TEST(OperationValidationTest, DestroyVolume)
ASSERT_SOME(error);
EXPECT_TRUE(strings::contains(
error->message,
- "Does not have a resource provider"));
+ "'volume' is not managed by a resource provider"));
destroyVolume.mutable_volume()->CopyFrom(disk3);
@@ -1880,7 +1880,7 @@ TEST(OperationValidationTest, CreateBlock)
ASSERT_SOME(error);
EXPECT_TRUE(strings::contains(
error->message,
- "Does not have a resource provider"));
+ "'source' is not managed by a resource provider"));
}
@@ -1910,7 +1910,7 @@ TEST(OperationValidationTest, DestroyBlock)
ASSERT_SOME(error);
EXPECT_TRUE(strings::contains(
error->message,
- "Does not have a resource provider"));
+ "'block' is not managed by a resource provider"));
destroyBlock.mutable_block()->CopyFrom(disk3);
[3/5] mesos git commit: Made the master include the operation ID in
OPERATION_DROPPED updates.
Posted by gr...@apache.org.
Made the master include the operation ID in OPERATION_DROPPED updates.
Review: https://reviews.apache.org/r/66924/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a570f943
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a570f943
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a570f943
Branch: refs/heads/master
Commit: a570f9436b816d40ba3d01455211f5d61f77d66d
Parents: b4c541b
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:32:56 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:15:30 2018 -0700
----------------------------------------------------------------------
src/master/master.cpp | 21 ++-
src/master/master.hpp | 2 +-
src/tests/master_slave_reconciliation_tests.cpp | 175 +++++++++++++++++++
src/tests/mesos.hpp | 1 +
4 files changed, 193 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 28a0661..f48a4f7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8417,8 +8417,7 @@ void Master::forward(
}
-void Master::updateOperationStatus(
- const UpdateOperationStatusMessage& update)
+void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
{
CHECK(update.has_slave_id())
<< "External resource provider is not supported yet";
@@ -8470,6 +8469,21 @@ void Master::updateOperationStatus(
return;
}
+ if (operation->info().has_id()) {
+ // Agents don't include the framework and operation IDs when sending
+ // operation status updates for dropped operations in response to a
+ // `ReconcileOperationsMessage`, but they can be deduced from the operation
+ // info kept on the master.
+
+ // Only operations done via the scheduler API can have an ID.
+ CHECK(operation->has_framework_id());
+
+ frameworkId = operation->framework_id();
+
+ update.mutable_status()->mutable_operation_id()->CopyFrom(
+ operation->info().id());
+ }
+
updateOperation(operation, update);
CHECK(operation->statuses_size() > 0);
@@ -8477,9 +8491,6 @@ void Master::updateOperationStatus(
const OperationStatus& latestStatus = *operation->statuses().rbegin();
if (operation->info().has_id()) {
- // Only operations done via the scheduler API can have an ID.
- CHECK_SOME(frameworkId);
-
// Forward the status update to the framework.
Framework* framework = getFramework(frameworkId.get());
http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 76e7763..5ec764b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -510,7 +510,7 @@ public:
ReconcileTasksMessage&& reconcileTasksMessage);
void updateOperationStatus(
- const UpdateOperationStatusMessage& update);
+ UpdateOperationStatusMessage&& update);
void exitedExecutor(
const process::UPID& from,
http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 71e22af..937bab0 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -419,6 +419,181 @@ TEST_F(MasterSlaveReconciliationTest, ReconcileDroppedOperation)
reconcileOperationsMessage->operations(0).operation_uuid());
}
+// The master reconciles operations that are missing from a re-registering
+// agent.
+//
+// In this case, the `ApplyOperationMessage` is dropped, so the agent should
+// respond with a OPERATION_DROPPED operation status update.
+//
+// This test verifies that if an operation ID is set, the framework receives
+// the OPERATION_DROPPED operation status update.
+//
+// This is a regression test for MESOS-8784.
+TEST_F(
+ MasterSlaveReconciliationTest,
+ ForwardOperationDroppedAfterExplicitReconciliation)
+{
+ Clock::pause();
+
+ mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+ Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+ ASSERT_SOME(master);
+
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
+
+ mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ // Wait for the agent to register.
+ AWAIT_READY(updateSlaveMessage);
+
+ // Start and register a resource provider.
+
+ v1::ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ v1::Resource disk = v1::createDiskResource(
+ "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+ Owned<v1::MockResourceProvider> resourceProvider(
+ new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+
+ // Make the mock resource provider answer to reconciliation events with
+ // OPERATION_DROPPED operation status updates.
+ auto reconcileOperations =
+ [&resourceProvider](
+ const v1::resource_provider::Event::ReconcileOperations& reconcile) {
+ foreach (const v1::UUID& operationUuid, reconcile.operation_uuids()) {
+ v1::resource_provider::Call call;
+
+ call.set_type(v1::resource_provider::Call::UPDATE_OPERATION_STATUS);
+ call.mutable_resource_provider_id()->CopyFrom(
+ resourceProvider->info.id());
+
+ v1::resource_provider::Call::UpdateOperationStatus*
+ updateOperationStatus = call.mutable_update_operation_status();
+
+ updateOperationStatus->mutable_status()->set_state(
+ v1::OPERATION_DROPPED);
+
+ updateOperationStatus->mutable_operation_uuid()->CopyFrom(
+ operationUuid);
+
+ resourceProvider->send(call);
+ }
+ };
+
+ EXPECT_CALL(*resourceProvider, reconcileOperations(_))
+ .WillOnce(Invoke(reconcileOperations));
+
+ Owned<EndpointDetector> endpointDetector(
+ mesos::internal::tests::resource_provider::createEndpointDetector(
+ slave.get()->pid));
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // fully register.
+ Clock::resume();
+
+ ContentType contentType = ContentType::PROTOBUF;
+
+ resourceProvider->start(
+ endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+ // Wait until the agent's resources have been updated to include the
+ // resource provider resources.
+ AWAIT_READY(updateSlaveMessage);
+
+ Clock::pause();
+
+ // Start a v1 framework.
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ // Ignore heartbeats.
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return());
+
+ Future<v1::scheduler::Event::Offers> offers;
+
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+ v1::scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+
+ // We'll drop the `ApplyOperationMessage` from the master to the agent.
+ Future<ApplyOperationMessage> applyOperationMessage =
+ DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
+
+ v1::Resources resources =
+ v1::Resources(offer.resources()).filter([](const v1::Resource& resource) {
+ return resource.has_provider_id();
+ });
+
+ ASSERT_FALSE(resources.empty());
+
+ v1::Resource reserved = *(resources.begin());
+ reserved.add_reservations()->CopyFrom(
+ v1::createDynamicReservationInfo(
+ frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+ v1::OperationID operationId;
+ operationId.set_value("operation");
+
+ mesos.send(v1::createCallAccept(
+ frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+
+ AWAIT_READY(applyOperationMessage);
+
+ Future<v1::scheduler::Event::UpdateOperationStatus> operationDroppedUpdate;
+ EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+ .WillOnce(FutureArg<1>(&operationDroppedUpdate));
+
+ // Simulate a spurious master change event (e.g., due to ZooKeeper
+ // expiration) at the slave to force re-registration.
+ detector->appoint(master.get()->pid);
+
+ // Advance the clock, so that the agent re-registers.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ // Wait for the framework to receive the OPERATION_DROPPED update.
+ AWAIT_READY(operationDroppedUpdate);
+
+ EXPECT_EQ(operationId, operationDroppedUpdate->status().operation_id());
+ EXPECT_EQ(v1::OPERATION_DROPPED, operationDroppedUpdate->status().state());
+}
+
// This test verifies that the master reconciles tasks that are
// missing from a reregistering slave. In this case, we trigger
// a race between the slave re-registration message and the launch
http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8da3b02..b945edf 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -466,6 +466,7 @@ using mesos::v1::TaskInfo;
using mesos::v1::TaskGroupInfo;
using mesos::v1::TaskState;
using mesos::v1::TaskStatus;
+using mesos::v1::UUID;
using mesos::v1::WeightInfo;
} // namespace v1 {
[2/5] mesos git commit: Made the master drop operations with an ID on
agent default resources.
Posted by gr...@apache.org.
Made the master drop operations with an ID on agent default resources.
Review: https://reviews.apache.org/r/66992/
Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4c541b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4c541b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4c541b4
Branch: refs/heads/master
Commit: b4c541b4d9677e2b84d8538f319a3dfe7987e327
Parents: 39b27e1
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:32:15 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:14:45 2018 -0700
----------------------------------------------------------------------
src/master/master.cpp | 8 +++
src/tests/master_tests.cpp | 75 +++++++++++++++++++++++
src/tests/operation_reconciliation_tests.cpp | 69 ++++++++++++++++++---
3 files changed, 144 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b5d2eb..28a0661 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4235,6 +4235,14 @@ void Master::accept(
break;
}
+ if (getResourceProviderId(operation).isNone()) {
+ drop(framework,
+ operation,
+ "Operation requested feedback, but it affects resources not"
+ " managed by a resource provider");
+ break;
+ }
+
if (!slave->capabilities.resourceProvider) {
drop(framework,
operation,
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index e159573..0f6042c 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8954,6 +8954,81 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
}
+// Tests that the master correctly drops an operation if the operation's 'id'
+// field is set and the operation affects resources not managed by a resource
+// provider.
+TEST_F(MasterTest, DropOperationWithIDAffectingDefaultResources)
+{
+ Clock::pause();
+
+ Try<Owned<cluster::Master>> master = StartMaster();
+ ASSERT_SOME(master);
+
+ mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+ Owned<MasterDetector> detector = master.get()->createDetector();
+ Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+ ASSERT_SOME(slave);
+
+ // Advance the clock to trigger agent registration.
+ Clock::advance(slaveFlags.registration_backoff_factor);
+
+ // Start a v1 framework.
+ auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+ v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+ frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+ EXPECT_CALL(*scheduler, connected(_))
+ .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+ Future<v1::scheduler::Event::Subscribed> subscribed;
+ EXPECT_CALL(*scheduler, subscribed(_, _))
+ .WillOnce(FutureArg<1>(&subscribed));
+
+ // Ignore heartbeats.
+ EXPECT_CALL(*scheduler, heartbeat(_))
+ .WillRepeatedly(Return());
+
+ Future<v1::scheduler::Event::Offers> offers;
+
+ EXPECT_CALL(*scheduler, offers(_, _))
+ .WillOnce(FutureArg<1>(&offers))
+ .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+ v1::scheduler::TestMesos mesos(
+ master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+ AWAIT_READY(subscribed);
+ v1::FrameworkID frameworkId(subscribed->framework_id());
+
+ AWAIT_READY(offers);
+ ASSERT_FALSE(offers->offers().empty());
+
+ const v1::Offer& offer = offers->offers(0);
+
+ Future<v1::scheduler::Event::UpdateOperationStatus> operationErrorUpdate;
+ EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+ .WillOnce(FutureArg<1>(&operationErrorUpdate));
+
+ v1::Resource reserved = *(offer.resources().begin());
+ reserved.add_reservations()->CopyFrom(
+ v1::createDynamicReservationInfo(
+ frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+ v1::OperationID operationId;
+ operationId.set_value("operation");
+
+ mesos.send(v1::createCallAccept(
+ frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+
+ // Wait for the framework to receive the OPERATION_ERROR update.
+ AWAIT_READY(operationErrorUpdate);
+
+ EXPECT_EQ(operationId, operationErrorUpdate->status().operation_id());
+ EXPECT_EQ(v1::OPERATION_ERROR, operationErrorUpdate->status().state());
+}
+
+
class MasterTestPrePostReservationRefinement
: public MasterTest,
public WithParamInterface<bool> {
http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/operation_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
index 9717e84..39cf188 100644
--- a/src/tests/operation_reconciliation_tests.cpp
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -74,14 +74,60 @@ TEST_P(OperationReconciliationTest, PendingOperation)
Try<Owned<cluster::Master>> master = StartMaster();
ASSERT_SOME(master);
+ Future<UpdateSlaveMessage> updateSlaveMessage =
+ FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
Owned<MasterDetector> detector = master.get()->createDetector();
mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+ // Disable HTTP authentication to simplify resource provider interactions.
+ slaveFlags.authenticate_http_readwrite = false;
+
Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
ASSERT_SOME(slave);
// Advance the clock to trigger agent registration.
Clock::advance(slaveFlags.registration_backoff_factor);
+ // Wait for the agent to register.
+ AWAIT_READY(updateSlaveMessage);
+
+ // Start and register a resource provider.
+
+ ResourceProviderInfo resourceProviderInfo;
+ resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+ resourceProviderInfo.set_name("test");
+
+ Resource disk =
+ createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
+
+ Owned<MockResourceProvider> resourceProvider(
+ new MockResourceProvider(
+ resourceProviderInfo,
+ Resources(disk)));
+
+ Owned<EndpointDetector> endpointDetector(
+ mesos::internal::tests::resource_provider::createEndpointDetector(
+ slave.get()->pid));
+
+ updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+ // NOTE: We need to resume the clock so that the resource provider can
+ // fully register.
+ Clock::resume();
+
+ ContentType contentType = GetParam();
+
+ resourceProvider->start(endpointDetector, contentType, DEFAULT_CREDENTIAL);
+
+ // Wait until the agent's resources have been updated to include the
+ // resource provider resources.
+ AWAIT_READY(updateSlaveMessage);
+ ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+ ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+
+ Clock::pause();
+
auto scheduler = std::make_shared<MockHTTPScheduler>();
FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
@@ -114,18 +160,25 @@ TEST_P(OperationReconciliationTest, PendingOperation)
const Offer& offer = offers->offers(0);
const AgentID& agentId = offer.agent_id();
- OperationID operationId;
- operationId.set_value("operation");
-
- const Resources reservedResources =
- Resources(offer.resources())
- .pushReservation(createDynamicReservationInfo(
- frameworkInfo.roles(0), frameworkInfo.principal()));
-
// We'll drop the `ApplyOperationMessage` from the master to the agent.
Future<ApplyOperationMessage> applyOperationMessage =
DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
+ Resources resources =
+ Resources(offer.resources()).filter([](const Resource& resource) {
+ return resource.has_provider_id();
+ });
+
+ ASSERT_FALSE(resources.empty());
+
+ Resource reservedResources = *(resources.begin());
+ reservedResources.add_reservations()->CopyFrom(
+ createDynamicReservationInfo(
+ frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+ OperationID operationId;
+ operationId.set_value("operation");
+
mesos.send(createCallAccept(
frameworkId,
offer,