You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/05/08 01:18:29 UTC

[1/5] mesos git commit: Made the 'SchedulerDriver' abort when operation's 'id' field is set.

Repository: mesos
Updated Branches:
  refs/heads/master e6298aef8 -> 52ae7f0e6


Made the 'SchedulerDriver' abort when operation's 'id' field is set.

Since the 'SchedulerDriver' does not support operation status updates,
this patch adds a check to the driver which will abort the scheduler
if the 'id' field is set in an offer operation.

Review: https://reviews.apache.org/r/66938/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/39b27e1b
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/39b27e1b
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/39b27e1b

Branch: refs/heads/master
Commit: 39b27e1bb90aab3f10c1203d8f4f65de4f32e774
Parents: e6298ae
Author: Greg Mann <gr...@mesosphere.io>
Authored: Mon May 7 17:31:55 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 17:31:55 2018 -0700

----------------------------------------------------------------------
 src/sched/sched.cpp           |  6 ++++
 src/tests/scheduler_tests.cpp | 63 --------------------------------------
 2 files changed, 6 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/39b27e1b/src/sched/sched.cpp
----------------------------------------------------------------------
diff --git a/src/sched/sched.cpp b/src/sched/sched.cpp
index 620a3b2..baa6b0c 100644
--- a/src/sched/sched.cpp
+++ b/src/sched/sched.cpp
@@ -1342,6 +1342,12 @@ protected:
 
     // Setting accept.operations.
     foreach (const Offer::Operation& _operation, operations) {
+      if (_operation.has_id()) {
+        ABORT("An offer operation's 'id' field was set, which is disallowed"
+              " because the SchedulerDriver cannot handle offer operation"
+              " status updates");
+      }
+
       Offer::Operation* operation = accept->add_operations();
       operation->CopyFrom(_operation);
     }

http://git-wip-us.apache.org/repos/asf/mesos/blob/39b27e1b/src/tests/scheduler_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/scheduler_tests.cpp b/src/tests/scheduler_tests.cpp
index 749420a..5474907 100644
--- a/src/tests/scheduler_tests.cpp
+++ b/src/tests/scheduler_tests.cpp
@@ -1193,69 +1193,6 @@ TEST_P(SchedulerTest, OperationFeedbackValidationNoResourceProviderCapability)
 }
 
 
-// Verifies invalidation of RESERVE operations with `id` set, when sent by a
-// `SchedulerDriver` framework.
-TEST_P(SchedulerTest, OperationFeedbackValidationSchedulerDriverFramework)
-{
-  Clock::pause();
-
-  master::Flags masterFlags = CreateMasterFlags();
-  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
-  ASSERT_SOME(master);
-
-  Owned<MasterDetector> detector = master.get()->createDetector();
-
-  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get());
-  ASSERT_SOME(slave);
-
-  FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
-  frameworkInfo.add_roles("framework-role");
-
-  MockScheduler scheduler;
-  MesosSchedulerDriver driver(
-      &scheduler,
-      frameworkInfo,
-      master.get()->pid,
-      DEFAULT_CREDENTIAL);
-
-  Future<FrameworkID> frameworkId;
-  EXPECT_CALL(scheduler, registered(&driver, _, _))
-    .WillOnce(FutureArg<1>(&frameworkId));
-
-  Future<vector<Offer>> offers;
-  EXPECT_CALL(scheduler, resourceOffers(&driver, _))
-    .WillOnce(FutureArg<1>(&offers));
-
-  driver.start();
-
-  AWAIT_READY(frameworkId);
-
-  Clock::advance(masterFlags.allocation_interval);
-  Clock::settle();
-
-  AWAIT_READY(offers);
-  ASSERT_FALSE(offers->empty());
-
-  Future<Nothing> schedulerError;
-  EXPECT_CALL(scheduler, error(_, _))
-    .WillOnce(FutureSatisfy(&schedulerError));
-
-  const Offer& offer = offers->at(0);
-
-  Resources resources = Resources::parse("cpus:0.1").get();
-  resources.pushReservation(createDynamicReservationInfo(
-      frameworkInfo.roles(1),
-      frameworkInfo.principal()));
-
-  Offer::Operation operation = RESERVE(resources);
-  operation.mutable_id()->set_value("RESERVE_OPERATION");
-
-  driver.acceptOffers({offer.id()}, {operation});
-
-  AWAIT_READY(schedulerError);
-}
-
-
 TEST_P(SchedulerTest, ShutdownExecutor)
 {
   Try<Owned<cluster::Master>> master = StartMaster();


[4/5] mesos git commit: Prevented master from sending operation updates to v0 frameworks.

Posted by gr...@apache.org.
Prevented master from sending operation updates to v0 frameworks.

Review: https://reviews.apache.org/r/66995/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/9d897259
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/9d897259
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/9d897259

Branch: refs/heads/master
Commit: 9d897259a39dc9f90e8fad191732a3fe45d63458
Parents: a570f94
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:33:32 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:15:30 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/9d897259/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index f48a4f7..41862db 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -2279,7 +2279,11 @@ void Master::drop(
                << " operation from framework " << *framework
                << ": " << message;
 
-  if (operation.has_id()) {
+  // NOTE: The operation validation code should be refactored. Due to the order
+  // of validation, it's possible that this function will be called before the
+  // master validates that operations from v0 frameworks should not have their
+  // ID set.
+  if (operation.has_id() && framework->http.isSome()) {
     scheduler::Event update;
     update.set_type(scheduler::Event::UPDATE_OPERATION_STATUS);
 


[5/5] mesos git commit: Improved validation messages for some operations.

Posted by gr...@apache.org.
Improved validation messages for some operations.

Review: https://reviews.apache.org/r/66939/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/52ae7f0e
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/52ae7f0e
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/52ae7f0e

Branch: refs/heads/master
Commit: 52ae7f0e6dd6952d243c37e8b8aa98ce7752a17d
Parents: 9d89725
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:33:56 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:15:31 2018 -0700

----------------------------------------------------------------------
 src/master/validation.cpp             | 8 ++++----
 src/tests/master_validation_tests.cpp | 8 ++++----
 2 files changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/52ae7f0e/src/master/validation.cpp
----------------------------------------------------------------------
diff --git a/src/master/validation.cpp b/src/master/validation.cpp
index 0c1c924..798fc79 100644
--- a/src/master/validation.cpp
+++ b/src/master/validation.cpp
@@ -2507,7 +2507,7 @@ Option<Error> validate(const Offer::Operation::CreateVolume& createVolume)
   }
 
   if (!Resources::hasResourceProvider(source)) {
-    return Error("Does not have a resource provider");
+    return Error("'source' is not managed by a resource provider");
   }
 
   if (!Resources::isDisk(source, Resource::DiskInfo::Source::RAW)) {
@@ -2533,7 +2533,7 @@ Option<Error> validate(const Offer::Operation::DestroyVolume& destroyVolume)
   }
 
   if (!Resources::hasResourceProvider(volume)) {
-    return Error("Does not have a resource provider");
+    return Error("'volume' is not managed by a resource provider");
   }
 
   if (!Resources::isDisk(volume, Resource::DiskInfo::Source::MOUNT) &&
@@ -2555,7 +2555,7 @@ Option<Error> validate(const Offer::Operation::CreateBlock& createBlock)
   }
 
   if (!Resources::hasResourceProvider(source)) {
-    return Error("Does not have a resource provider");
+    return Error("'source' is not managed by a resource provider");
   }
 
   if (!Resources::isDisk(source, Resource::DiskInfo::Source::RAW)) {
@@ -2576,7 +2576,7 @@ Option<Error> validate(const Offer::Operation::DestroyBlock& destroyBlock)
   }
 
   if (!Resources::hasResourceProvider(block)) {
-    return Error("Does not have a resource provider");
+    return Error("'block' is not managed by a resource provider");
   }
 
   if (!Resources::isDisk(block, Resource::DiskInfo::Source::BLOCK)) {

http://git-wip-us.apache.org/repos/asf/mesos/blob/52ae7f0e/src/tests/master_validation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_validation_tests.cpp b/src/tests/master_validation_tests.cpp
index fb1d8bd..6f2a78e 100644
--- a/src/tests/master_validation_tests.cpp
+++ b/src/tests/master_validation_tests.cpp
@@ -1795,7 +1795,7 @@ TEST(OperationValidationTest, CreateVolume)
   ASSERT_SOME(error);
   EXPECT_TRUE(strings::contains(
       error->message,
-      "Does not have a resource provider"));
+      "'source' is not managed by a resource provider"));
 
   createVolume.mutable_source()->CopyFrom(disk1);
   createVolume.set_target_type(Resource::DiskInfo::Source::BLOCK);
@@ -1834,7 +1834,7 @@ TEST(OperationValidationTest, DestroyVolume)
   ASSERT_SOME(error);
   EXPECT_TRUE(strings::contains(
       error->message,
-      "Does not have a resource provider"));
+      "'volume' is not managed by a resource provider"));
 
   destroyVolume.mutable_volume()->CopyFrom(disk3);
 
@@ -1880,7 +1880,7 @@ TEST(OperationValidationTest, CreateBlock)
   ASSERT_SOME(error);
   EXPECT_TRUE(strings::contains(
       error->message,
-      "Does not have a resource provider"));
+      "'source' is not managed by a resource provider"));
 }
 
 
@@ -1910,7 +1910,7 @@ TEST(OperationValidationTest, DestroyBlock)
   ASSERT_SOME(error);
   EXPECT_TRUE(strings::contains(
       error->message,
-      "Does not have a resource provider"));
+      "'block' is not managed by a resource provider"));
 
   destroyBlock.mutable_block()->CopyFrom(disk3);
 


[3/5] mesos git commit: Made the master include the operation ID in OPERATION_DROPPED updates.

Posted by gr...@apache.org.
Made the master include the operation ID in OPERATION_DROPPED updates.

Review: https://reviews.apache.org/r/66924/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/a570f943
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/a570f943
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/a570f943

Branch: refs/heads/master
Commit: a570f9436b816d40ba3d01455211f5d61f77d66d
Parents: b4c541b
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:32:56 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:15:30 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp                           |  21 ++-
 src/master/master.hpp                           |   2 +-
 src/tests/master_slave_reconciliation_tests.cpp | 175 +++++++++++++++++++
 src/tests/mesos.hpp                             |   1 +
 4 files changed, 193 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 28a0661..f48a4f7 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -8417,8 +8417,7 @@ void Master::forward(
 }
 
 
-void Master::updateOperationStatus(
-    const UpdateOperationStatusMessage& update)
+void Master::updateOperationStatus(UpdateOperationStatusMessage&& update)
 {
   CHECK(update.has_slave_id())
     << "External resource provider is not supported yet";
@@ -8470,6 +8469,21 @@ void Master::updateOperationStatus(
     return;
   }
 
+  if (operation->info().has_id()) {
+    // Agents don't include the framework and operation IDs when sending
+    // operation status updates for dropped operations in response to a
+    // `ReconcileOperationsMessage`, but they can be deduced from the operation
+    // info kept on the master.
+
+    // Only operations done via the scheduler API can have an ID.
+    CHECK(operation->has_framework_id());
+
+    frameworkId = operation->framework_id();
+
+    update.mutable_status()->mutable_operation_id()->CopyFrom(
+        operation->info().id());
+  }
+
   updateOperation(operation, update);
 
   CHECK(operation->statuses_size() > 0);
@@ -8477,9 +8491,6 @@ void Master::updateOperationStatus(
   const OperationStatus& latestStatus = *operation->statuses().rbegin();
 
   if (operation->info().has_id()) {
-    // Only operations done via the scheduler API can have an ID.
-    CHECK_SOME(frameworkId);
-
     // Forward the status update to the framework.
     Framework* framework = getFramework(frameworkId.get());
 

http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/master/master.hpp
----------------------------------------------------------------------
diff --git a/src/master/master.hpp b/src/master/master.hpp
index 76e7763..5ec764b 100644
--- a/src/master/master.hpp
+++ b/src/master/master.hpp
@@ -510,7 +510,7 @@ public:
       ReconcileTasksMessage&& reconcileTasksMessage);
 
   void updateOperationStatus(
-      const UpdateOperationStatusMessage& update);
+      UpdateOperationStatusMessage&& update);
 
   void exitedExecutor(
       const process::UPID& from,

http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/master_slave_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_slave_reconciliation_tests.cpp b/src/tests/master_slave_reconciliation_tests.cpp
index 71e22af..937bab0 100644
--- a/src/tests/master_slave_reconciliation_tests.cpp
+++ b/src/tests/master_slave_reconciliation_tests.cpp
@@ -419,6 +419,181 @@ TEST_F(MasterSlaveReconciliationTest, ReconcileDroppedOperation)
       reconcileOperationsMessage->operations(0).operation_uuid());
 }
 
+// The master reconciles operations that are missing from a re-registering
+// agent.
+//
+// In this case, the `ApplyOperationMessage` is dropped, so the agent should
+// respond with a OPERATION_DROPPED operation status update.
+//
+// This test verifies that if an operation ID is set, the framework receives
+// the OPERATION_DROPPED operation status update.
+//
+// This is a regression test for MESOS-8784.
+TEST_F(
+    MasterSlaveReconciliationTest,
+    ForwardOperationDroppedAfterExplicitReconciliation)
+{
+  Clock::pause();
+
+  mesos::internal::master::Flags masterFlags = CreateMasterFlags();
+  Try<Owned<cluster::Master>> master = StartMaster(masterFlags);
+  ASSERT_SOME(master);
+
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  auto detector = std::make_shared<StandaloneMasterDetector>(master.get()->pid);
+
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the agent to register.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start and register a resource provider.
+
+  v1::ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  v1::Resource disk = v1::createDiskResource(
+      "200", "*", None(), None(), v1::createDiskSourceRaw());
+
+  Owned<v1::MockResourceProvider> resourceProvider(
+      new v1::MockResourceProvider(resourceProviderInfo, v1::Resources(disk)));
+
+  // Make the mock resource provider answer to reconciliation events with
+  // OPERATION_DROPPED operation status updates.
+  auto reconcileOperations =
+    [&resourceProvider](
+        const v1::resource_provider::Event::ReconcileOperations& reconcile) {
+      foreach (const v1::UUID& operationUuid, reconcile.operation_uuids()) {
+        v1::resource_provider::Call call;
+
+        call.set_type(v1::resource_provider::Call::UPDATE_OPERATION_STATUS);
+        call.mutable_resource_provider_id()->CopyFrom(
+            resourceProvider->info.id());
+
+        v1::resource_provider::Call::UpdateOperationStatus*
+          updateOperationStatus = call.mutable_update_operation_status();
+
+        updateOperationStatus->mutable_status()->set_state(
+            v1::OPERATION_DROPPED);
+
+        updateOperationStatus->mutable_operation_uuid()->CopyFrom(
+            operationUuid);
+
+        resourceProvider->send(call);
+      }
+    };
+
+  EXPECT_CALL(*resourceProvider, reconcileOperations(_))
+    .WillOnce(Invoke(reconcileOperations));
+
+  Owned<EndpointDetector> endpointDetector(
+      mesos::internal::tests::resource_provider::createEndpointDetector(
+          slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // fully register.
+  Clock::resume();
+
+  ContentType contentType = ContentType::PROTOBUF;
+
+  resourceProvider->start(
+      endpointDetector, contentType, v1::DEFAULT_CREDENTIAL);
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources.
+  AWAIT_READY(updateSlaveMessage);
+
+  Clock::pause();
+
+  // Start a v1 framework.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  v1::scheduler::TestMesos mesos(master.get()->pid, contentType, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  // We'll drop the `ApplyOperationMessage` from the master to the agent.
+  Future<ApplyOperationMessage> applyOperationMessage =
+    DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
+
+  v1::Resources resources =
+    v1::Resources(offer.resources()).filter([](const v1::Resource& resource) {
+      return resource.has_provider_id();
+    });
+
+  ASSERT_FALSE(resources.empty());
+
+  v1::Resource reserved = *(resources.begin());
+  reserved.add_reservations()->CopyFrom(
+      v1::createDynamicReservationInfo(
+          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+
+  AWAIT_READY(applyOperationMessage);
+
+  Future<v1::scheduler::Event::UpdateOperationStatus> operationDroppedUpdate;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&operationDroppedUpdate));
+
+  // Simulate a spurious master change event (e.g., due to ZooKeeper
+  // expiration) at the slave to force re-registration.
+  detector->appoint(master.get()->pid);
+
+  // Advance the clock, so that the agent re-registers.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Wait for the framework to receive the OPERATION_DROPPED update.
+  AWAIT_READY(operationDroppedUpdate);
+
+  EXPECT_EQ(operationId, operationDroppedUpdate->status().operation_id());
+  EXPECT_EQ(v1::OPERATION_DROPPED, operationDroppedUpdate->status().state());
+}
+
 // This test verifies that the master reconciles tasks that are
 // missing from a reregistering slave. In this case, we trigger
 // a race between the slave re-registration message and the launch

http://git-wip-us.apache.org/repos/asf/mesos/blob/a570f943/src/tests/mesos.hpp
----------------------------------------------------------------------
diff --git a/src/tests/mesos.hpp b/src/tests/mesos.hpp
index 8da3b02..b945edf 100644
--- a/src/tests/mesos.hpp
+++ b/src/tests/mesos.hpp
@@ -466,6 +466,7 @@ using mesos::v1::TaskInfo;
 using mesos::v1::TaskGroupInfo;
 using mesos::v1::TaskState;
 using mesos::v1::TaskStatus;
+using mesos::v1::UUID;
 using mesos::v1::WeightInfo;
 
 } // namespace v1 {


[2/5] mesos git commit: Made the master drop operations with an ID on agent default resources.

Posted by gr...@apache.org.
Made the master drop operations with an ID on agent default resources.

Review: https://reviews.apache.org/r/66992/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4c541b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4c541b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4c541b4

Branch: refs/heads/master
Commit: b4c541b4d9677e2b84d8538f319a3dfe7987e327
Parents: 39b27e1
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:32:15 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:14:45 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp                        |  8 +++
 src/tests/master_tests.cpp                   | 75 +++++++++++++++++++++++
 src/tests/operation_reconciliation_tests.cpp | 69 ++++++++++++++++++---
 3 files changed, 144 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b5d2eb..28a0661 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4235,6 +4235,14 @@ void Master::accept(
               break;
             }
 
+            if (getResourceProviderId(operation).isNone()) {
+              drop(framework,
+                   operation,
+                   "Operation requested feedback, but it affects resources not"
+                   " managed by a resource provider");
+              break;
+            }
+
             if (!slave->capabilities.resourceProvider) {
               drop(framework,
                    operation,

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index e159573..0f6042c 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8954,6 +8954,81 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
 }
 
 
+// Tests that the master correctly drops an operation if the operation's 'id'
+// field is set and the operation affects resources not managed by a resource
+// provider.
+TEST_F(MasterTest, DropOperationWithIDAffectingDefaultResources)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Start a v1 framework.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  Future<v1::scheduler::Event::UpdateOperationStatus> operationErrorUpdate;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&operationErrorUpdate));
+
+  v1::Resource reserved = *(offer.resources().begin());
+  reserved.add_reservations()->CopyFrom(
+      v1::createDynamicReservationInfo(
+          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+
+  // Wait for the framework to receive the OPERATION_ERROR update.
+  AWAIT_READY(operationErrorUpdate);
+
+  EXPECT_EQ(operationId, operationErrorUpdate->status().operation_id());
+  EXPECT_EQ(v1::OPERATION_ERROR, operationErrorUpdate->status().state());
+}
+
+
 class MasterTestPrePostReservationRefinement
   : public MasterTest,
     public WithParamInterface<bool> {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/operation_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
index 9717e84..39cf188 100644
--- a/src/tests/operation_reconciliation_tests.cpp
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -74,14 +74,60 @@ TEST_P(OperationReconciliationTest, PendingOperation)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
   Owned<MasterDetector> detector = master.get()->createDetector();
   mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
   Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   // Advance the clock to trigger agent registration.
   Clock::advance(slaveFlags.registration_backoff_factor);
 
+  // Wait for the agent to register.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start and register a resource provider.
+
+  ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  Resource disk =
+    createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
+
+  Owned<MockResourceProvider> resourceProvider(
+      new MockResourceProvider(
+          resourceProviderInfo,
+          Resources(disk)));
+
+  Owned<EndpointDetector> endpointDetector(
+      mesos::internal::tests::resource_provider::createEndpointDetector(
+          slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // fully register.
+  Clock::resume();
+
+  ContentType contentType = GetParam();
+
+  resourceProvider->start(endpointDetector, contentType, DEFAULT_CREDENTIAL);
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources.
+  AWAIT_READY(updateSlaveMessage);
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+
+  Clock::pause();
+
   auto scheduler = std::make_shared<MockHTTPScheduler>();
 
   FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
@@ -114,18 +160,25 @@ TEST_P(OperationReconciliationTest, PendingOperation)
   const Offer& offer = offers->offers(0);
   const AgentID& agentId = offer.agent_id();
 
-  OperationID operationId;
-  operationId.set_value("operation");
-
-  const Resources reservedResources =
-    Resources(offer.resources())
-      .pushReservation(createDynamicReservationInfo(
-          frameworkInfo.roles(0), frameworkInfo.principal()));
-
   // We'll drop the `ApplyOperationMessage` from the master to the agent.
   Future<ApplyOperationMessage> applyOperationMessage =
     DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
 
+  Resources resources =
+    Resources(offer.resources()).filter([](const Resource& resource) {
+      return resource.has_provider_id();
+    });
+
+  ASSERT_FALSE(resources.empty());
+
+  Resource reservedResources = *(resources.begin());
+  reservedResources.add_reservations()->CopyFrom(
+      createDynamicReservationInfo(
+          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
   mesos.send(createCallAccept(
       frameworkId,
       offer,