You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mesos.apache.org by gr...@apache.org on 2018/05/08 01:18:30 UTC

[2/5] mesos git commit: Made the master drop operations with an ID on agent default resources.

Made the master drop operations with an ID on agent default resources.

Review: https://reviews.apache.org/r/66992/


Project: http://git-wip-us.apache.org/repos/asf/mesos/repo
Commit: http://git-wip-us.apache.org/repos/asf/mesos/commit/b4c541b4
Tree: http://git-wip-us.apache.org/repos/asf/mesos/tree/b4c541b4
Diff: http://git-wip-us.apache.org/repos/asf/mesos/diff/b4c541b4

Branch: refs/heads/master
Commit: b4c541b4d9677e2b84d8538f319a3dfe7987e327
Parents: 39b27e1
Author: Gaston Kleiman <ga...@mesosphere.io>
Authored: Mon May 7 17:32:15 2018 -0700
Committer: Greg Mann <gr...@gmail.com>
Committed: Mon May 7 18:14:45 2018 -0700

----------------------------------------------------------------------
 src/master/master.cpp                        |  8 +++
 src/tests/master_tests.cpp                   | 75 +++++++++++++++++++++++
 src/tests/operation_reconciliation_tests.cpp | 69 ++++++++++++++++++---
 3 files changed, 144 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/master/master.cpp
----------------------------------------------------------------------
diff --git a/src/master/master.cpp b/src/master/master.cpp
index 3b5d2eb..28a0661 100644
--- a/src/master/master.cpp
+++ b/src/master/master.cpp
@@ -4235,6 +4235,14 @@ void Master::accept(
               break;
             }
 
+            if (getResourceProviderId(operation).isNone()) {
+              drop(framework,
+                   operation,
+                   "Operation requested feedback, but it affects resources not"
+                   " managed by a resource provider");
+              break;
+            }
+
             if (!slave->capabilities.resourceProvider) {
               drop(framework,
                    operation,

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/master_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/master_tests.cpp b/src/tests/master_tests.cpp
index e159573..0f6042c 100644
--- a/src/tests/master_tests.cpp
+++ b/src/tests/master_tests.cpp
@@ -8954,6 +8954,81 @@ TEST_F(MasterTest, OperationUpdateDuringFailover)
 }
 
 
+// Tests that the master correctly drops an operation if the operation's 'id'
+// field is set and the operation affects resources not managed by a resource
+// provider.
+TEST_F(MasterTest, DropOperationWithIDAffectingDefaultResources)
+{
+  Clock::pause();
+
+  Try<Owned<cluster::Master>> master = StartMaster();
+  ASSERT_SOME(master);
+
+  mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+  Owned<MasterDetector> detector = master.get()->createDetector();
+  Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
+  ASSERT_SOME(slave);
+
+  // Advance the clock to trigger agent registration.
+  Clock::advance(slaveFlags.registration_backoff_factor);
+
+  // Start a v1 framework.
+  auto scheduler = std::make_shared<v1::MockHTTPScheduler>();
+
+  v1::FrameworkInfo frameworkInfo = v1::DEFAULT_FRAMEWORK_INFO;
+  frameworkInfo.set_roles(0, DEFAULT_TEST_ROLE);
+
+  EXPECT_CALL(*scheduler, connected(_))
+    .WillOnce(v1::scheduler::SendSubscribe(frameworkInfo));
+
+  Future<v1::scheduler::Event::Subscribed> subscribed;
+  EXPECT_CALL(*scheduler, subscribed(_, _))
+    .WillOnce(FutureArg<1>(&subscribed));
+
+  // Ignore heartbeats.
+  EXPECT_CALL(*scheduler, heartbeat(_))
+    .WillRepeatedly(Return());
+
+  Future<v1::scheduler::Event::Offers> offers;
+
+  EXPECT_CALL(*scheduler, offers(_, _))
+    .WillOnce(FutureArg<1>(&offers))
+    .WillRepeatedly(v1::scheduler::DeclineOffers());
+
+  v1::scheduler::TestMesos mesos(
+      master.get()->pid, ContentType::PROTOBUF, scheduler);
+
+  AWAIT_READY(subscribed);
+  v1::FrameworkID frameworkId(subscribed->framework_id());
+
+  AWAIT_READY(offers);
+  ASSERT_FALSE(offers->offers().empty());
+
+  const v1::Offer& offer = offers->offers(0);
+
+  Future<v1::scheduler::Event::UpdateOperationStatus> operationErrorUpdate;
+  EXPECT_CALL(*scheduler, updateOperationStatus(_, _))
+    .WillOnce(FutureArg<1>(&operationErrorUpdate));
+
+  v1::Resource reserved = *(offer.resources().begin());
+  reserved.add_reservations()->CopyFrom(
+      v1::createDynamicReservationInfo(
+          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+  v1::OperationID operationId;
+  operationId.set_value("operation");
+
+  mesos.send(v1::createCallAccept(
+      frameworkId, offer, {v1::RESERVE(reserved, operationId.value())}));
+
+  // Wait for the framework to receive the OPERATION_ERROR update.
+  AWAIT_READY(operationErrorUpdate);
+
+  EXPECT_EQ(operationId, operationErrorUpdate->status().operation_id());
+  EXPECT_EQ(v1::OPERATION_ERROR, operationErrorUpdate->status().state());
+}
+
+
 class MasterTestPrePostReservationRefinement
   : public MasterTest,
     public WithParamInterface<bool> {

http://git-wip-us.apache.org/repos/asf/mesos/blob/b4c541b4/src/tests/operation_reconciliation_tests.cpp
----------------------------------------------------------------------
diff --git a/src/tests/operation_reconciliation_tests.cpp b/src/tests/operation_reconciliation_tests.cpp
index 9717e84..39cf188 100644
--- a/src/tests/operation_reconciliation_tests.cpp
+++ b/src/tests/operation_reconciliation_tests.cpp
@@ -74,14 +74,60 @@ TEST_P(OperationReconciliationTest, PendingOperation)
   Try<Owned<cluster::Master>> master = StartMaster();
   ASSERT_SOME(master);
 
+  Future<UpdateSlaveMessage> updateSlaveMessage =
+    FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
   Owned<MasterDetector> detector = master.get()->createDetector();
   mesos::internal::slave::Flags slaveFlags = CreateSlaveFlags();
+
+  // Disable HTTP authentication to simplify resource provider interactions.
+  slaveFlags.authenticate_http_readwrite = false;
+
   Try<Owned<cluster::Slave>> slave = StartSlave(detector.get(), slaveFlags);
   ASSERT_SOME(slave);
 
   // Advance the clock to trigger agent registration.
   Clock::advance(slaveFlags.registration_backoff_factor);
 
+  // Wait for the agent to register.
+  AWAIT_READY(updateSlaveMessage);
+
+  // Start and register a resource provider.
+
+  ResourceProviderInfo resourceProviderInfo;
+  resourceProviderInfo.set_type("org.apache.mesos.rp.test");
+  resourceProviderInfo.set_name("test");
+
+  Resource disk =
+    createDiskResource("200", "*", None(), None(), createDiskSourceRaw());
+
+  Owned<MockResourceProvider> resourceProvider(
+      new MockResourceProvider(
+          resourceProviderInfo,
+          Resources(disk)));
+
+  Owned<EndpointDetector> endpointDetector(
+      mesos::internal::tests::resource_provider::createEndpointDetector(
+          slave.get()->pid));
+
+  updateSlaveMessage = FUTURE_PROTOBUF(UpdateSlaveMessage(), _, _);
+
+  // NOTE: We need to resume the clock so that the resource provider can
+  // fully register.
+  Clock::resume();
+
+  ContentType contentType = GetParam();
+
+  resourceProvider->start(endpointDetector, contentType, DEFAULT_CREDENTIAL);
+
+  // Wait until the agent's resources have been updated to include the
+  // resource provider resources.
+  AWAIT_READY(updateSlaveMessage);
+  ASSERT_TRUE(updateSlaveMessage->has_resource_providers());
+  ASSERT_EQ(1, updateSlaveMessage->resource_providers().providers_size());
+
+  Clock::pause();
+
   auto scheduler = std::make_shared<MockHTTPScheduler>();
 
   FrameworkInfo frameworkInfo = DEFAULT_FRAMEWORK_INFO;
@@ -114,18 +160,25 @@ TEST_P(OperationReconciliationTest, PendingOperation)
   const Offer& offer = offers->offers(0);
   const AgentID& agentId = offer.agent_id();
 
-  OperationID operationId;
-  operationId.set_value("operation");
-
-  const Resources reservedResources =
-    Resources(offer.resources())
-      .pushReservation(createDynamicReservationInfo(
-          frameworkInfo.roles(0), frameworkInfo.principal()));
-
   // We'll drop the `ApplyOperationMessage` from the master to the agent.
   Future<ApplyOperationMessage> applyOperationMessage =
     DROP_PROTOBUF(ApplyOperationMessage(), master.get()->pid, _);
 
+  Resources resources =
+    Resources(offer.resources()).filter([](const Resource& resource) {
+      return resource.has_provider_id();
+    });
+
+  ASSERT_FALSE(resources.empty());
+
+  Resource reservedResources = *(resources.begin());
+  reservedResources.add_reservations()->CopyFrom(
+      createDynamicReservationInfo(
+          frameworkInfo.roles(0), DEFAULT_CREDENTIAL.principal()));
+
+  OperationID operationId;
+  operationId.set_value("operation");
+
   mesos.send(createCallAccept(
       frameworkId,
       offer,